核心

%load_ext autoreload
%autoreload 2
import copy
import inspect
import reprlib
import warnings
from collections import Counter, OrderedDict
from contextlib import contextmanager
from pathlib import Path
from typing import (
    Any,
    Callable,
    Dict,
    Iterable,
    Iterator,
    List,
    Mapping,
    Optional,
    Tuple,
    Union,
)

import cloudpickle
import fsspec
import numpy as np
import pandas as pd
import utilsforecast.processing as ufp
from sklearn.base import BaseEstimator, clone
from sklearn.pipeline import Pipeline
from utilsforecast.compat import (
    DFType,
    DataFrame,
    pl,
    pl_DataFrame,
    pl_Series,
)
from utilsforecast.validation import validate_format, validate_freq

from mlforecast.grouped_array import GroupedArray
from mlforecast.lag_transforms import _BaseLagTransform, Lag
from mlforecast.target_transforms import (
    _BaseGroupedArrayTargetTransform,
    BaseTargetTransform,
)
from mlforecast.utils import _ShortSeriesException, _ensure_shallow_copy
import datetime

import tempfile
from nbdev import show_doc
from fastcore.test import test_eq, test_fail, test_warns
from window_ops.expanding import expanding_mean
from window_ops.rolling import rolling_mean
from window_ops.shift import shift_array

from mlforecast.callbacks import SaveFeatures
from mlforecast.lag_transforms import ExpandingMean, RollingMean
from mlforecast.target_transforms import Differences, LocalStandardScaler
from mlforecast.utils import generate_daily_series, generate_prices_for_series

数据格式

所需的输入格式是一个数据框,至少包含以下列: * unique_id 用于每个时间序列的唯一标识符 * ds 用于日期戳的列 * y 用于序列的值。

除非在 TimeSeries.fit 中另有说明,否则所有其他列都被视为静态特征。

series = generate_daily_series(20, n_static_features=2)
series
unique_id ds y static_0 static_1
0 id_00 2000-01-01 7.404529 27 53
1 id_00 2000-01-02 35.952624 27 53
2 id_00 2000-01-03 68.958353 27 53
3 id_00 2000-01-04 84.994505 27 53
4 id_00 2000-01-05 113.219810 27 53
... ... ... ... ... ...
4869 id_19 2000-03-25 400.606807 97 45
4870 id_19 2000-03-26 538.794824 97 45
4871 id_19 2000-03-27 620.202104 97 45
4872 id_19 2000-03-28 20.625426 97 45
4873 id_19 2000-03-29 141.513169 97 45

4874 rows × 5 columns

为了简单起见,我们这里只考虑一个时间序列。

uids = series['unique_id'].unique()
serie = series[series['unique_id'].eq(uids[0])]
serie
unique_id ds y static_0 static_1
0 id_00 2000-01-01 7.404529 27 53
1 id_00 2000-01-02 35.952624 27 53
2 id_00 2000-01-03 68.958353 27 53
3 id_00 2000-01-04 84.994505 27 53
4 id_00 2000-01-05 113.219810 27 53
... ... ... ... ... ...
217 id_00 2000-08-05 13.263188 27 53
218 id_00 2000-08-06 38.231981 27 53
219 id_00 2000-08-07 59.555183 27 53
220 id_00 2000-08-08 86.986368 27 53
221 id_00 2000-08-09 119.254810 27 53

222 rows × 5 columns

date_features_dtypes = {
    "year": np.uint16,
    "month": np.uint8,
    "day": np.uint8,
    "hour": np.uint8,
    "minute": np.uint8,
    "second": np.uint8,
    "dayofyear": np.uint16,
    "day_of_year": np.uint16,
    "weekofyear": np.uint8,
    "week": np.uint8,
    "dayofweek": np.uint8,
    "day_of_week": np.uint8,
    "weekday": np.uint8,
    "quarter": np.uint8,
    "daysinmonth": np.uint8,
    "is_month_start": np.uint8,
    "is_month_end": np.uint8,
    "is_quarter_start": np.uint8,
    "is_quarter_end": np.uint8,
    "is_year_start": np.uint8,
    "is_year_end": np.uint8,
}
def _build_function_transform_name(tfm: Callable, lag: int, *args) -> str:
    """基于`lag`、函数名称及其参数,创建一个转换的名称。"""
    tfm_name = f"{tfm.__name__}_lag{lag}"
    func_params = inspect.signature(tfm).parameters
    func_args = list(func_params.items())[1:]  # 移除输入数组参数
    changed_params = [
        f"{name}{value}"
        for value, (name, arg) in zip(args, func_args)
        if arg.default != value
    ]
    if changed_params:
        tfm_name += "_" + "_".join(changed_params)
    return tfm_name
test_eq(_build_function_transform_name(expanding_mean, 1), 'expanding_mean_lag1')
test_eq(_build_function_transform_name(rolling_mean, 2, 7), 'rolling_mean_lag2_window_size7')
def _build_lag_transform_name(tfm: _BaseLagTransform, lag: int) -> str:
    return tfm._get_name(lag)
test_eq(_build_lag_transform_name(ExpandingMean(), 1), 'expanding_mean_lag1')
test_eq(_build_lag_transform_name(RollingMean(7), 2), 'rolling_mean_lag2_window_size7')
def _build_transform_name(
    tfm: Union[Callable, _BaseLagTransform], lag: int, *args
) -> str:
    if callable(tfm):
        name = _build_function_transform_name(tfm, lag, *args)
    else:
        name = _build_lag_transform_name(tfm, lag)
    return name
def _get_model_name(model) -> str:
    if isinstance(model, Pipeline):
        return _get_model_name(model.steps[-1][1])
    return model.__class__.__name__

def _name_models(current_names):
    ctr = Counter(current_names)
    if not ctr:
        return []
    if max(ctr.values()) < 2:
        return current_names
    names = current_names.copy()
    for i, x in enumerate(reversed(current_names), start=1):
        count = ctr[x]
        if count > 1:
            name = f"{x}{count}"
            ctr[x] -= 1
        else:
            name = x
        names[-i] = name
    return names
# 一个重复项
names = ['a', 'b', 'a', 'c']
expected = ['a', 'b', 'a2', 'c']
actual = _name_models(names)
assert actual == expected

# 无重复项
names = ['a', 'b', 'c']
actual = _name_models(names)
assert actual == names
def _as_tuple(x):
    """从输入中返回一个元组。"""
    if isinstance(x, tuple):
        return x
    return (x,)
Freq = Union[int, str]
Lags = Iterable[int]
LagTransform = Union[Callable, Tuple[Callable, Any]]
LagTransforms = Dict[int, List[LagTransform]]
DateFeature = Union[str, Callable]
Models = Union[BaseEstimator, List[BaseEstimator], Dict[str, BaseEstimator]]
TargetTransform = Union[BaseTargetTransform, _BaseGroupedArrayTargetTransform]
Transforms = Dict[str, Union[Tuple[Any, ...], _BaseLagTransform]]
def _parse_transforms(
    lags: Lags,
    lag_transforms: LagTransforms,
    namer: Optional[Callable] = None,
) -> Transforms:
    transforms: Transforms = OrderedDict()
    if namer is None:
        namer = _build_transform_name
    for lag in lags:
        transforms[f'lag{lag}'] = Lag(lag)
    for lag in lag_transforms.keys():
        for tfm in lag_transforms[lag]:
            if isinstance(tfm, _BaseLagTransform):
                tfm_name = namer(tfm, lag)
                transforms[tfm_name] = clone(tfm)._set_core_tfm(lag)
            else:
                tfm, *args = _as_tuple(tfm)
                assert callable(tfm)
                tfm_name = namer(tfm, lag, *args)
                transforms[tfm_name] = (lag, tfm, *args)
    return transforms
class TimeSeries:
    """用于存储和转换时间序列数据的实用类。"""

    def __init__(
        self,
        freq: Freq,
        lags: Optional[Lags] = None,
        lag_transforms: Optional[LagTransforms] = None,
        date_features: Optional[Iterable[DateFeature]] = None,
        num_threads: int = 1,
        target_transforms: Optional[List[TargetTransform]] = None,
        lag_transforms_namer: Optional[Callable] = None,
    ):
        self.freq = freq
        if not isinstance(num_threads, int) or num_threads < 1:
            warnings.warn('Setting num_threads to 1.')
            num_threads = 1
        self.lags = [] if lags is None else list(lags)
        for lag in self.lags:
            if lag <= 0 or not isinstance(lag, int):
                raise ValueError('lags must be positive integers.')
        self.lag_transforms = {} if lag_transforms is None else lag_transforms
        for lag in self.lag_transforms.keys():
            if lag <= 0 or not isinstance(lag, int):
                raise ValueError('keys of lag_transforms must be positive integers.')
        self.date_features = [] if date_features is None else list(date_features)
        self.num_threads = num_threads
        self.target_transforms = target_transforms
        if self.target_transforms is not None:
            for tfm in self.target_transforms:
                if isinstance(tfm, _BaseGroupedArrayTargetTransform):
                    tfm.set_num_threads(num_threads)
        for feature in self.date_features:
            if callable(feature) and feature.__name__ == '<lambda>':
                raise ValueError(
                    "Can't use a lambda as a date feature because the function name gets used as the feature name."
                )
        self.lag_transforms_namer = lag_transforms_namer
        self.transforms = _parse_transforms(
            lags=self.lags, lag_transforms=self.lag_transforms, namer=lag_transforms_namer
        )
        self.ga: GroupedArray

    def _get_core_lag_tfms(self) -> Dict[str, _BaseLagTransform]:
        return {k: v for k, v in self.transforms.items() if isinstance(v, _BaseLagTransform)}

    @property
    def _date_feature_names(self):
        return [f.__name__ if callable(f) else f for f in self.date_features]
        
    @property
    def features(self) -> List[str]:
        """所有计算特征的名称。"""
        return list(self.transforms.keys()) + self._date_feature_names

    def __repr__(self):
        return (
            f"TimeSeries(freq={self.freq}, "
            f"transforms={list(self.transforms.keys())}, "
            f"date_features={self._date_feature_names}, "
            f"num_threads={self.num_threads})"
        )

    def _fit(
        self,
        df: DataFrame,
        id_col: str,
        time_col: str,
        target_col: str,
        static_features: Optional[List[str]] = None,
        keep_last_n: Optional[int] = None,
    ) -> 'TimeSeries':
        """保存系列值、ID和最后日期。"""
        validate_format(df, id_col, time_col, target_col)
        validate_freq(df[time_col], self.freq)
        if ufp.is_nan_or_none(df[target_col]).any():
            raise ValueError(f'{target_col} column contains null values.')
        self.id_col = id_col
        self.target_col = target_col
        self.time_col = time_col
        self.keep_last_n = keep_last_n
        self.static_features = static_features
        sorted_df = df[[id_col, time_col, target_col]]
        sorted_df = ufp.copy_if_pandas(sorted_df, deep=False)
        uids, times, data, indptr, self._sort_idxs = ufp.process_df(
            df=sorted_df,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
        )
        if data.ndim == 2:
            data = data[:, 0]
        ga = GroupedArray(data, indptr)
        if isinstance(df, pd.DataFrame):
            self.uids = pd.Index(uids)
            self.last_dates = pd.Index(times)
        else:
            self.uids = uids
            self.last_dates = pl_Series(times)
        if self._sort_idxs is not None:
            self._restore_idxs: Optional[np.ndarray] = np.empty(df.shape[0], dtype=np.int32)
            self._restore_idxs[self._sort_idxs] = np.arange(df.shape[0])
            sorted_df = ufp.take_rows(sorted_df, self._sort_idxs)
        else:
            self._restore_idxs = None
        if self.target_transforms is not None:
            for tfm in self.target_transforms:
                if isinstance(tfm, _BaseGroupedArrayTargetTransform):
                    try:
                        ga = tfm.fit_transform(ga)
                    except _ShortSeriesException as exc:
                        tfm_name = tfm.__class__.__name__
                        uids = reprlib.repr(list(self.uids[exc.args]))
                        raise ValueError(
                            f"The following series are too short for the '{tfm_name}' transformation: {uids}."
                        ) from None
                    sorted_df = ufp.assign_columns(sorted_df, target_col, ga.data)
                else:
                    tfm.set_column_names(id_col, time_col, target_col)
                    sorted_df = tfm.fit_transform(sorted_df)
                    ga.data = sorted_df[target_col].to_numpy()
        to_drop = [id_col, time_col, target_col]
        if static_features is None:
            static_features = [c for c in df.columns if c not in [time_col, target_col]]
        elif id_col not in static_features:
            static_features = [id_col] + static_features
        else:  # static_features defined and contain id_col
            to_drop = [time_col, target_col]
        self.ga = ga
        series_starts = ga.indptr[:-1]
        series_ends = ga.indptr[1:] - 1
        if self._sort_idxs is not None:
            series_starts = self._sort_idxs[series_starts]
            series_ends = self._sort_idxs[series_ends]
        statics_on_starts = ufp.drop_index_if_pandas(
            ufp.take_rows(df, series_starts)[static_features]
        )
        statics_on_ends = ufp.drop_index_if_pandas(
            ufp.take_rows(df, series_ends)[static_features]
        )
        for feat in static_features:
            if (statics_on_starts[feat] != statics_on_ends[feat]).any():
                raise ValueError(
                    f'{feat} is declared as a static feature but its values change '
                    'over time. Please set the `static_features` argument to '
                    'indicate which features are static.\nIf all of your features '
                    'are dynamic please set `static_features=[]`.'
                )
        self.static_features_ = statics_on_ends
        self.features_order_ = [c for c in df.columns if c not in to_drop] + self.features
        return self

    def _compute_transforms(
        self,
        transforms: Mapping[str, Union[Tuple[Any, ...], _BaseLagTransform]],
        updates_only: bool,
    ) -> Dict[str, np.ndarray]:
        """计算构造函数中定义的变换。

        如果 `self.num_threads > 1`,这些计算将使用多线程进行。"""
        if self.num_threads == 1 or len(transforms) == 1:
            out = self.ga.apply_transforms(
                transforms=transforms, updates_only=updates_only
            )
        else:
            out = self.ga.apply_multithreaded_transforms(
                transforms=transforms,
                num_threads=self.num_threads,
                updates_only=updates_only,
            )
        return out
    
    def _compute_date_feature(self, dates, feature): 
        if callable(feature):
            feat_name = feature.__name__
            feat_vals = feature(dates)
        else:
            feat_name = feature
            if isinstance(dates, pd.DatetimeIndex):
                if feature in ('week', 'weekofyear'):
                    dates = dates.isocalendar()
                feat_vals = getattr(dates, feature)
            else:
                feat_vals = getattr(dates.dt, feature)()
        if isinstance(feat_vals, (pd.Index, pd.Series)):
            feat_vals = np.asarray(feat_vals)
            feat_dtype = date_features_dtypes.get(feature)
            if feat_dtype is not None:
                feat_vals = feat_vals.astype(feat_dtype)
        return feat_name, feat_vals

    def _transform(
        self,
        df: DFType,
        dropna: bool = True,
        max_horizon: Optional[int] = None,
        return_X_y: bool = False,
        as_numpy: bool = False,
    ) -> DFType:
        """将这些功能添加到 `df` 中。

如果 `dropna=True`,则所有空值行都会被删除。"""
        transforms = {k: v for k, v in self.transforms.items() if k not in df}
        features = self._compute_transforms(transforms=transforms, updates_only=False)
        if self._restore_idxs is not None:
            for k, v in features.items():
                features[k] = v[self._restore_idxs]

        # target
        self.max_horizon = max_horizon
        if max_horizon is None:
            target = self.ga.data
        else:
            target = self.ga.expand_target(max_horizon)
        if self._restore_idxs is not None:
            target = target[self._restore_idxs]       

        # determine rows to keep
        if dropna:
            feature_nulls = np.full(df.shape[0], False)
            for feature_vals in features.values():
                feature_nulls |= np.isnan(feature_vals)
            target_nulls = np.isnan(target)
            if target_nulls.ndim == 2:
                # target nulls for each horizon are dropped in MLForecast.fit_models
                # we just drop rows here for which all the target values are null
                target_nulls = target_nulls.all(axis=1)
            keep_rows = ~(feature_nulls | target_nulls)
            for k, v in features.items():
                features[k] = v[keep_rows]
            target = target[keep_rows]
            df = ufp.filter_with_mask(df, keep_rows)
            df = ufp.copy_if_pandas(df, deep=False)
            last_idxs = self.ga.indptr[1:] - 1
            if self._sort_idxs is not None:
                last_idxs = self._sort_idxs[last_idxs]
            last_vals_nan = ~keep_rows[last_idxs]
            if last_vals_nan.any():
                self._dropped_series: Optional[np.ndarray] = np.where(last_vals_nan)[0]                
                dropped_ids = reprlib.repr(list(self.uids[self._dropped_series]))
                warnings.warn(
                    "The following series were dropped completely "
                    f"due to the transformations and features: {dropped_ids}.\n"
                    "These series won't show up if you use `MLForecast.forecast_fitted_values()`.\n"
                    "You can set `dropna=False` or use transformations that require less samples to mitigate this"
                )
            else:
                self._dropped_series = None
        elif isinstance(df, pd.DataFrame):
            df = df.copy(deep=False)
            self._dropped_series = None

        # once we've computed the features and target we can slice the series
        if self.keep_last_n is not None:
            self.ga = self.ga.take_from_groups(slice(-self.keep_last_n, None))         
        del self._restore_idxs, self._sort_idxs

        # lag transforms
        for feat in transforms.keys():
            df = ufp.assign_columns(df, feat, features[feat])

        # date features
        names = [f.__name__ if callable(f) else f for f in self.date_features]
        date_features = [f for f, name in zip(self.date_features, names) if name not in df]
        if date_features:
            unique_dates = df[self.time_col].unique()
            if isinstance(df, pd.DataFrame):
                # all kinds of trickery to make this fast
                unique_dates = pd.Index(unique_dates)
                date2pos = {date: i for i, date in enumerate(unique_dates)}
                restore_idxs = df[self.time_col].map(date2pos)
                for feature in date_features:
                    feat_name, feat_vals = self._compute_date_feature(unique_dates, feature)
                    df[feat_name] = feat_vals[restore_idxs]
            elif isinstance(df, pl_DataFrame):
                exprs = []
                for feat in date_features:  # type: ignore
                    name, vals = self._compute_date_feature(pl.col(self.time_col), feat)
                    exprs.append(vals.alias(name))
                feats = unique_dates.to_frame().with_columns(*exprs)
                df = df.join(feats, on=self.time_col, how='left')

        # assemble return
        if return_X_y:
            X = df[self.features_order_]
            if as_numpy:
                X = ufp.to_numpy(X)
            return X, target
        if max_horizon is not None:
            # remove original target
            out_cols = [c for c in df.columns if c != self.target_col]
            df = df[out_cols]
            target_names = [f"{self.target_col}{i}" for i in range(max_horizon)]
            df = ufp.assign_columns(df, target_names, target)
        else:
            if isinstance(df, pd.DataFrame):
                df = _ensure_shallow_copy(df)
            df = ufp.assign_columns(df, self.target_col, target)
        return df


    def fit_transform(
        self,
        data: DFType,
        id_col: str,
        time_col: str,
        target_col: str,
        static_features: Optional[List[str]] = None,
        dropna: bool = True,
        keep_last_n: Optional[int] = None,
        max_horizon: Optional[int] = None,
        return_X_y: bool = False,
        as_numpy: bool = False,
    ) -> Union[DFType, Tuple[DFType, np.ndarray]]:
        """Add the features to `data` and save the required information for the predictions step.
        
        If not all features are static, specify which ones are in `static_features`.
        If you don't want to drop rows with null values after the transformations set `dropna=False`
        If `keep_last_n` is not None then that number of observations is kept across all series for updates.
        """
        self.dropna = dropna
        self.as_numpy = as_numpy
        self._fit(
            df=data,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
            static_features=static_features,
            keep_last_n=keep_last_n,
        )
        return self._transform(
            df=data,
            dropna=dropna,
            max_horizon=max_horizon,
            return_X_y=return_X_y,
            as_numpy=as_numpy,
        )

    def _update_y(self, new: np.ndarray) -> None:
        """将`new`中的元素附加到每个时间序列中。

        这些值用于更新转换,并作为预测结果存储。"""
        if not hasattr(self, 'y_pred'):
            self.y_pred = []
        self.y_pred.append(new)
        new_arr = np.asarray(new)
        self.ga = self.ga.append(new_arr)
        
    def _update_features(self) -> DataFrame:
        """使用时间序列的最新值计算所有特征的当前值。"""
        self.curr_dates: Union[pd.Index, pl_Series] = ufp.offset_times(self.curr_dates, self.freq, 1)
        self.test_dates.append(self.curr_dates)
 
        features = self._compute_transforms(self.transforms, updates_only=True)
 
        for feature in self.date_features:
            feat_name, feat_vals = self._compute_date_feature(self.curr_dates, feature)
            features[feat_name] = feat_vals

        if isinstance(self.last_dates, pl_Series):
            df_constructor = pl_DataFrame
        else:
            df_constructor = pd.DataFrame
        features_df = df_constructor(features)[self.features]
        return ufp.horizontal_concat([self.static_features_, features_df])

    def _get_raw_predictions(self) -> np.ndarray:
        return np.array(self.y_pred).ravel('F')

    def _get_future_ids(self, h: int):
        if isinstance(self.uids, pl_Series):
            uids = pl.concat([self.uids for _ in range(h)]).sort()
        else:
            uids = pd.Series(
                np.repeat(self.uids, h), name=self.id_col, dtype=self.uids.dtype
            )
        return uids

    def _get_predictions(self) -> DataFrame:
        """获取所有预测值及其对应的ID和时间戳。"""
        h = len(self.y_pred)
        if isinstance(self.uids, pl_Series):
            df_constructor = pl_DataFrame
        else:
            df_constructor = pd.DataFrame
        uids = self._get_future_ids(h)
        df = df_constructor(
            {
                self.id_col: uids,
                self.time_col: np.array(self.test_dates).ravel('F'),
                f'{self.target_col}_pred': self._get_raw_predictions(),
            },
        )
        return df

    def _get_features_for_next_step(self, X_df=None):
        new_x = self._update_features()
        if X_df is not None:
            n_series = len(self.uids)
            h = X_df.shape[0] // n_series
            rows = np.arange(self._h, X_df.shape[0], h)
            X = ufp.take_rows(X_df, rows)
            X = ufp.drop_index_if_pandas(X)
            new_x = ufp.horizontal_concat([new_x, X])
        if isinstance(new_x, pd.DataFrame):
            nulls = new_x.isnull().any()
            cols_with_nulls = nulls[nulls].index.tolist()
        else:
            nulls = new_x.select(pl.all().is_null().any())
            cols_with_nulls = [k for k, v in nulls.to_dicts()[0].items() if v]
        if cols_with_nulls:
            warnings.warn(
                f'Found null values in {", ".join(cols_with_nulls)}.'
            )
        self._h += 1
        new_x = new_x[self.features_order_]
        if self.as_numpy:
            new_x = ufp.to_numpy(new_x)
        return new_x

    @contextmanager
    def _backup(self) -> Iterator[None]:
        # 这会在预测过程中被修改,因为预测结果会被追加。
        ga = copy.copy(self.ga)
        # if these save state (like ExpandingMean) they'll get modified by the updates
        lag_tfms = copy.deepcopy(self.transforms)
        try:
            yield
        finally:
            self.ga = ga
            self.transforms = lag_tfms

    def _predict_setup(self) -> None:
        # TODO: move to utils
        if isinstance(self.last_dates, pl_Series):
            self.curr_dates  = self.last_dates.clone()
        else:
            self.curr_dates  = self.last_dates.copy()
        self.test_dates: List[Union[pd.Index, pl_Series]] = []
        self.y_pred = []
        self._h = 0

    def _predict_recursive(
        self,
        models: Dict[str, BaseEstimator],
        horizon: int,
        before_predict_callback: Optional[Callable] = None,
        after_predict_callback: Optional[Callable] = None,
        X_df: Optional[DFType] = None,
    ) -> DFType:
        """使用 `model` 预测未来 `horizon` 个时间步。"""
        for i, (name, model) in enumerate(models.items()):
            with self._backup():
                self._predict_setup()
                for _ in range(horizon):
                    new_x = self._get_features_for_next_step(X_df)
                    if before_predict_callback is not None:
                        new_x = before_predict_callback(new_x)
                    predictions = model.predict(new_x)
                    if after_predict_callback is not None:
                        predictions = after_predict_callback(predictions)
                    self._update_y(predictions)
                if i == 0:
                    preds = self._get_predictions()
                    rename_dict = {f'{self.target_col}_pred': name}
                    preds = ufp.rename(preds, rename_dict)
                else:
                    raw_preds = self._get_raw_predictions()
                    preds = ufp.assign_columns(preds, name, raw_preds)
        return preds

    def _predict_multi(
        self,
        models: Dict[str, BaseEstimator],
        horizon: int,
        before_predict_callback: Optional[Callable] = None,
        X_df: Optional[DFType] = None,
    ) -> DFType:
        assert self.max_horizon is not None
        if horizon > self.max_horizon:
            raise ValueError(f'horizon must be at most max_horizon ({self.max_horizon})')
        self._predict_setup()
        uids = self._get_future_ids(horizon)
        starts = ufp.offset_times(self.curr_dates, self.freq, 1)
        dates = ufp.time_ranges(starts, self.freq, periods=horizon)
        if isinstance(self.curr_dates, pl_Series):
            df_constructor = pl_DataFrame
        else:
            df_constructor = pd.DataFrame
        result = df_constructor({self.id_col: uids, self.time_col: dates})
        for name, model in models.items():
            with self._backup():
                new_x = self._get_features_for_next_step(X_df)
                if before_predict_callback is not None:
                    new_x = before_predict_callback(new_x)
                predictions = np.empty((new_x.shape[0], horizon))
                for i in range(horizon):
                    predictions[:, i] = model[i].predict(new_x)
                raw_preds = predictions.ravel()
                result = ufp.assign_columns(result, name, raw_preds)
        return result

    def _has_ga_target_tfms(self):
        return any(isinstance(tfm, _BaseGroupedArrayTargetTransform) for tfm in self.target_transforms)

    @contextmanager
    def _maybe_subset(self, idxs: Optional[np.ndarray]) -> Iterator[None]:
        # save original
        ga = self.ga
        uids = self.uids
        statics = self.static_features_
        last_dates = self.last_dates
        targ_tfms = copy.copy(self.target_transforms)
        lag_tfms = copy.deepcopy(self.transforms)

        if idxs is not None:
            # assign subsets
            self.ga = self.ga.take(idxs)            
            self.uids = uids[idxs]
            self.static_features_ = ufp.take_rows(statics, idxs)
            self.static_features_ = ufp.drop_index_if_pandas(self.static_features_)
            self.last_dates = last_dates[idxs]
            if self.target_transforms is not None:
                for i, tfm in enumerate(self.target_transforms):
                    if isinstance(tfm, _BaseGroupedArrayTargetTransform):
                        self.target_transforms[i] = tfm.take(idxs)
            for name, lag_tfm in self.transforms.items():
                if isinstance(lag_tfm, _BaseLagTransform):
                    lag_tfm = lag_tfm.take(idxs)
                self.transforms[name] = lag_tfm
        try:
            yield
        finally:
            self.ga = ga
            self.uids = uids
            self.static_features_ = statics
            self.last_dates = last_dates
            self.target_transforms = targ_tfms
            self.lag_tfms = lag_tfms

    def predict(
        self,
        models: Dict[str, Union[BaseEstimator, List[BaseEstimator]]],
        horizon: int,
        before_predict_callback: Optional[Callable] = None,
        after_predict_callback: Optional[Callable] = None,
        X_df: Optional[DFType] = None,
        ids: Optional[List[str]] = None,
    ) -> DFType:
        if ids is not None:
            unseen = set(ids) - set(self.uids)
            if unseen:
                raise ValueError(f"The following ids weren't seen during training and thus can't be forecasted: {unseen}")
            idxs: Optional[np.ndarray] = np.where(ufp.is_in(self.uids, ids))[0]
        else:
            idxs = None
        with self._maybe_subset(idxs):
            if X_df is not None:
                if self.id_col not in X_df or self.time_col not in X_df:
                    raise ValueError(f"X_df must have '{self.id_col}' and '{self.time_col}' columns.")
                if X_df.shape[1] < 3:
                    raise ValueError("Found no exogenous features in `X_df`.")
                statics = [c for c in self.static_features_.columns if c != self.id_col]
                dynamics = [c for c in X_df.columns if c not in [self.id_col, self.time_col]]
                common = [c for c in dynamics if c in statics]
                if common:
                    raise ValueError(
                        f"The following features were provided through `X_df` but were considered as static during fit: {common}.\n"
                        "Please re-run the fit step using the `static_features` argument to indicate which features are static. "
                        "If all your features are dynamic please pass an empty list (static_features=[])."
                    )
                starts = ufp.offset_times(self.last_dates, self.freq, 1)
                ends = ufp.offset_times(self.last_dates, self.freq, horizon)
                dates_validation = type(X_df)(
                    {
                        self.id_col: self.uids,
                        '_start': starts,
                        '_end': ends,
                    }
                )
                X_df = ufp.join(X_df, dates_validation, on=self.id_col)
                mask = ufp.between(X_df[self.time_col], X_df['_start'], X_df['_end'])
                X_df = ufp.filter_with_mask(X_df, mask)
                if X_df.shape[0] != len(self.uids) * horizon:
                    msg = (
                        "Found missing inputs in X_df. "
                        "It should have one row per id and time for the complete forecasting horizon.\n"
                        "You can get the expected structure by running `MLForecast.make_future_dataframe(h)` "
                        "or get the missing combinatins in your current `X_df` by running `MLForecast.get_missing_future(h, X_df)`."
                    ) 
                    raise ValueError(msg)
                drop_cols = [self.id_col, self.time_col, '_start', '_end']
                X_df = ufp.sort(X_df, [self.id_col, self.time_col])
                X_df = ufp.drop_columns(X_df, drop_cols)
            if getattr(self, 'max_horizon', None) is None:
                preds = self._predict_recursive(
                    models=models,
                    horizon=horizon,
                    before_predict_callback=before_predict_callback,
                    after_predict_callback=after_predict_callback,
                    X_df=X_df,
                )
            else:
                preds = self._predict_multi(
                    models=models,
                    horizon=horizon,
                    before_predict_callback=before_predict_callback,
                    X_df=X_df,
                )
            if self.target_transforms is not None:
                if self._has_ga_target_tfms():
                    model_cols = [c for c in preds.columns if c not in (self.id_col, self.time_col)]
                    indptr = np.arange(0, horizon * (len(self.uids) + 1), horizon)
                for tfm in self.target_transforms[::-1]:
                    if isinstance(tfm, _BaseGroupedArrayTargetTransform):
                        for col in model_cols:
                            ga = GroupedArray(preds[col].to_numpy().astype(self.ga.data.dtype), indptr)
                            ga = tfm.inverse_transform(ga)
                            preds = ufp.assign_columns(preds, col, ga.data)
                    else:
                        preds = tfm.inverse_transform(preds)
        return preds

    def save(self, path: Union[str, Path]) -> None:
        with fsspec.open(path, 'wb') as f:
            cloudpickle.dump(self, f)

    @staticmethod
    def load(
        path: Union[str, Path], protocol: Optional[str] = None
    ) -> 'TimeSeries':
        with fsspec.open(path, 'rb', protocol=protocol) as f:
            ts = cloudpickle.load(f)
        return ts

    def update(self, df: DataFrame) -> None:
        """更新存储序列的值。"""
        validate_format(df, self.id_col, self.time_col, self.target_col)
        uids = self.uids
        if isinstance(uids, pd.Index):
            uids = pd.Series(uids)
        uids, new_ids = ufp.match_if_categorical(uids, df[self.id_col])
        df = ufp.copy_if_pandas(df, deep=False)
        df = ufp.assign_columns(df, self.id_col, new_ids)
        df = ufp.sort(df, by=[self.id_col, self.time_col])
        values = df[self.target_col].to_numpy()
        values = values.astype(self.ga.data.dtype, copy=False)
        id_counts = ufp.counts_by_id(df, self.id_col)
        try:
            sizes = ufp.join(uids, id_counts, on=self.id_col, how='outer_coalesce')
        except (KeyError, ValueError):
            # pandas raises key error, polars before coalesce raises value error
            sizes = ufp.join(uids, id_counts, on=self.id_col, how='outer')
        sizes = ufp.fill_null(sizes, {'counts': 0})
        sizes = ufp.sort(sizes, by=self.id_col)
        new_groups = ~ufp.is_in(sizes[self.id_col], uids)
        last_dates = ufp.group_by_agg(df, self.id_col, {self.time_col: 'max'})
        last_dates = ufp.join(sizes, last_dates, on=self.id_col, how='left')
        curr_last_dates = type(df)({self.id_col: uids, '_curr': self.last_dates})
        last_dates = ufp.join(last_dates, curr_last_dates, on=self.id_col, how='left')
        last_dates = ufp.fill_null(last_dates, {self.time_col: last_dates['_curr']})
        last_dates = ufp.sort(last_dates, by=self.id_col)
        self.last_dates = ufp.cast(last_dates[self.time_col], self.last_dates.dtype)
        self.uids = ufp.sort(sizes[self.id_col])
        if isinstance(df, pd.DataFrame):
            self.uids = pd.Index(self.uids)
            self.last_dates = pd.Index(self.last_dates)
        if new_groups.any():
            if self.target_transforms is not None:
                raise ValueError('Can not update target_transforms with new series.')
            new_ids = ufp.filter_with_mask(sizes[self.id_col], new_groups)
            new_ids_df = ufp.filter_with_mask(df, ufp.is_in(df[self.id_col], new_ids))
            new_ids_counts = ufp.counts_by_id(new_ids_df, self.id_col)
            new_statics = ufp.take_rows(df, new_ids_counts["counts"].to_numpy().cumsum() - 1)
            new_statics = new_statics[self.static_features_.columns]
            self.static_features_ = ufp.vertical_concat([self.static_features_, new_statics])
            self.static_features_ = ufp.sort(self.static_features_, self.id_col)
        if self.target_transforms is not None:
            if self._has_ga_target_tfms():            
                indptr = np.append(0, id_counts['counts']).cumsum()
            for tfm in self.target_transforms:
                if isinstance(tfm, _BaseGroupedArrayTargetTransform):
                    ga = GroupedArray(values, indptr)
                    ga = tfm.update(ga)
                    df = ufp.assign_columns(df, self.target_col, ga.data)
                else:
                    df = tfm.update(df)
                values = df[self.target_col].to_numpy()                    
        self.ga = self.ga.append_several(
            new_sizes=sizes['counts'].to_numpy().astype(np.int32),
            new_values=values,
            new_groups=new_groups.to_numpy(),
        )
test_fail(lambda: TimeSeries(freq='D', lags=list(range(2))), contains='lags must be positive integers')
test_fail(lambda: TimeSeries(freq='D', lag_transforms={0: 1}), contains='keys of lag_transforms must be positive integers')
# 差异
n = 7 * 14
x = pd.DataFrame(
    {
        'id': np.repeat(0, n),
        'ds': np.arange(n),
        'y': np.arange(7)[[x % 7 for x in np.arange(n)]]
    },
)
x['y'] = x['ds'] * 0.1 + x['y']
ts = TimeSeries(freq=1, target_transforms=[Differences([1, 7])])
ts._fit(x.iloc[:-14], id_col='id', time_col='ds', target_col='y')
ts.as_numpy = False
np.testing.assert_allclose(
    x['y'].diff(1).diff(7).values[:-14],
    ts.ga.data,
)
ts.y_pred = np.zeros(14)
class A:
    def fit(self, X):
        return self
    def predict(self, X):
        return np.zeros(X.shape[0])
xx = ts.predict({'A': A()}, 14)
np.testing.assert_allclose(xx['A'], x['y'].tail(14).values)
# 变换命名器
def namer(f, lag, *args):
    return f'hello_from_{f.__name__}'

ts = TimeSeries(
    freq=1,
    lag_transforms={1: [(rolling_mean, 7), expanding_mean]},
    lag_transforms_namer=namer,
)
transformed = ts.fit_transform(x, id_col='id', time_col='ds', target_col='y')
test_eq(
    transformed.columns.tolist(),
    ['id', 'ds', 'y', 'hello_from_rolling_mean', 'hello_from_expanding_mean'],
)
test_fail(lambda: TimeSeries(freq=1, date_features=[lambda: 1]), contains="Can't use a lambda")

TimeSeries 类负责定义要执行的转换(lagslag_transformsdate_features)。如果 num_threads > 1,则可以使用多线程计算这些转换。

def month_start_or_end(dates):
    return dates.is_month_start | dates.is_month_end

flow_config = dict(
    freq='W-THU',
    lags=[7],
    lag_transforms={
        1: [expanding_mean, (rolling_mean, 7)]
    },
    date_features=['dayofweek', 'week', month_start_or_end]
)

ts = TimeSeries(**flow_config)
ts
TimeSeries(freq=W-THU, transforms=['lag7', 'expanding_mean_lag1', 'rolling_mean_lag1_window_size7'], date_features=['dayofweek', 'week', 'month_start_or_end'], num_threads=1)
test_eq(
    TimeSeries(freq=ts.freq).freq,
    TimeSeries(freq='W-THU').freq
)

频率被转换为一个偏移量。

test_eq(ts.freq, pd.tseries.frequencies.to_offset(flow_config['freq']))

日期特征按照传递给构造函数的方式存储。

test_eq(ts.date_features, flow_config['date_features'])

变换被存储为一个字典,其中键是变换的名称(计算特征的 DataFrame 中的列名),该名称是使用 build_transform_name 构建的,值是一个元组,元组的第一个元素是施加的滞后期,接下来是函数及函数参数。

test_eq(
    ts.transforms, 
    {
        'lag7': Lag(7),
        'expanding_mean_lag1': (1, expanding_mean), 
        'rolling_mean_lag1_window_size7': (1, rolling_mean, 7)
        
    }
)

注意,对于 lags,我们将转换定义为应用于其相应滞后的恒等函数。这是因为 _transform_series 将滞后作为参数,并在计算转换之前移动数组。

# 整数y被转换为float32类型
serie2 = serie.copy()
serie2['y'] = serie2['y'].astype(int)
ts = TimeSeries(num_threads=1, freq='D')
ts._fit(serie2, id_col='unique_id', time_col='ds', target_col='y')
test_eq(ts.ga.data.dtype, np.float32)
# _计算变换
y = serie.y.values
lag_1 = shift_array(y, 1)

for num_threads in (1, 2):
    ts = TimeSeries(**flow_config)
    ts._fit(serie, id_col='unique_id', time_col='ds', target_col='y')
    transforms = ts._compute_transforms(ts.transforms, updates_only=False)

    np.testing.assert_equal(transforms['lag7'], shift_array(y, 7))
    np.testing.assert_equal(transforms['expanding_mean_lag1'], expanding_mean(lag_1))
    np.testing.assert_equal(transforms['rolling_mean_lag1_window_size7'], rolling_mean(lag_1, 7))
# 更新_y
ts = TimeSeries(freq='D', lags=[1])
ts._fit(serie, id_col='unique_id', time_col='ds', target_col='y')

max_size = np.diff(ts.ga.indptr)
ts._update_y([1])
ts._update_y([2])

test_eq(np.diff(ts.ga.indptr), max_size + 2)
test_eq(ts.ga.data[-2:], [1, 2])
# _更新功能
ts = TimeSeries(**flow_config)
ts._fit(serie, id_col='unique_id', time_col='ds', target_col='y')
ts._predict_setup()
updates = ts._update_features()

last_date = serie['ds'].max()
first_prediction_date = last_date + pd.offsets.Day()

# these have an offset becase we can now "see" our last y value
expected = pd.DataFrame({
    'unique_id': ts.uids,
    'lag7': shift_array(y, 6)[-1],
    'expanding_mean_lag1': expanding_mean(y)[-1],
    'rolling_mean_lag1_window_size7': rolling_mean(y, 7)[-1],
    'dayofweek': np.uint8([getattr(first_prediction_date, 'dayofweek')]),
    'week': np.uint8([first_prediction_date.isocalendar()[1]]),
    'month_start_or_end': month_start_or_end(first_prediction_date)
})
statics = serie.tail(1).drop(columns=['ds', 'y'])
pd.testing.assert_frame_equal(updates, statics.merge(expected))


test_eq(ts.curr_dates[0], first_prediction_date)
# _获取预测结果
ts = TimeSeries(freq='D', lags=[1])
ts._fit(serie, id_col='unique_id', time_col='ds', target_col='y')
ts._predict_setup()
ts._update_features()
ts._update_y([1.])
preds = ts._get_predictions()

last_ds = serie['ds'].max()
expected = pd.DataFrame({'unique_id': serie['unique_id'][[0]], 'ds': [last_ds + pd.offsets.Day()], 'y_pred': [1.]})
pd.testing.assert_frame_equal(preds, expected)
show_doc(TimeSeries.fit_transform, title_level=2)

source

TimeSeries.fit_transform

 TimeSeries.fit_transform (data:Union[pandas.core.frame.DataFrame,polars.d
                           ataframe.frame.DataFrame], id_col:str,
                           time_col:str, target_col:str,
                           static_features:Optional[List[str]]=None,
                           dropna:bool=True,
                           keep_last_n:Optional[int]=None,
                           max_horizon:Optional[int]=None,
                           return_X_y:bool=False, as_numpy:bool=False)

*Add the features to data and save the required information for the predictions step.

If not all features are static, specify which ones are in static_features. If you don’t want to drop rows with null values after the transformations set dropna=False If keep_last_n is not None then that number of observations is kept across all series for updates.*

flow_config = dict(
    freq='D',
    lags=[7, 14],
    lag_transforms={
        2: [
            (rolling_mean, 7),
            (rolling_mean, 14),
        ]
    },
    date_features=['dayofweek', 'month', 'year'],
    num_threads=2
)

ts = TimeSeries(**flow_config)
_ = ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y')

系列值存储为属性 ga 中的 GroupedArray。如果系列值的数据类型为整数,则转换为 np.float32,这是因为滞后会生成 np.nan,因此我们需要浮点数据类型来表示它们。

np.testing.assert_equal(ts.ga.data, series.y.values)

系列id存储在uids属性中。

test_eq(ts.uids, series['unique_id'].unique())

对于每个时间序列,最后观察到的日期被存储,以便预测从最后日期 + 频率开始。

test_eq(ts.last_dates, series.groupby('unique_id', observed=True)['ds'].max().values)

每个系列的最后一行(不包括 yds 列)被视为静态特征。

pd.testing.assert_frame_equal(
    ts.static_features_,
    series.groupby('unique_id', observed=True).tail(1).drop(columns=['ds', 'y']).reset_index(drop=True),
)

如果将static_features传递给TimeSeries.fit_transform,则仅保留这些特征。

ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y', static_features=['static_0'])

pd.testing.assert_frame_equal(
    ts.static_features_,
    series.groupby('unique_id', observed=True).tail(1)[['unique_id', 'static_0']].reset_index(drop=True),
)

您还可以在 TimeSeries.fit_transform 中指定 keep_last_n,这意味着在计算训练特征后,我们希望仅保留每个时间序列的最后 n 个样本以进行更新。这可以节省内存和时间,因为更新是通过再次对所有时间序列运行转换函数并仅保留最后一个值(即更新)来完成的。

如果您有非常长的时间序列,并且更新只需要少量样本,建议将 keep_last_n 设置为计算更新所需的最小样本量,在这种情况下是 15,因为我们在延迟 2 上有一个大小为 14 的滚动均值,在第一次更新中,延迟 2 变为延迟 1。这是因为在第一次更新中,延迟 1 是序列的最后一个值(或延迟 0),延迟 2 是延迟 1,依此类推。

keep_last_n = 15

ts = TimeSeries(**flow_config)
df = ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y', keep_last_n=keep_last_n)
ts._predict_setup()

expected_lags = ['lag7', 'lag14']
expected_transforms = ['rolling_mean_lag2_window_size7', 
                       'rolling_mean_lag2_window_size14']
expected_date_features = ['dayofweek', 'month', 'year']

test_eq(ts.features, expected_lags + expected_transforms + expected_date_features)
test_eq(ts.static_features_.columns.tolist() + ts.features, df.columns.drop(['ds', 'y']).tolist())
# 我们因滞后2而丢弃了2行,还需再丢弃13行以获得大小为14的窗口。
test_eq(df.shape[0], series.shape[0] - (2 + 13) * ts.ga.n_groups)
test_eq(ts.ga.data.size, ts.ga.n_groups * keep_last_n)

TimeSeries.fit_transform 要求 y 列不能有任何空值。这是因为转换可能会向前传播这些空值,因此如果 y 列中有空值,您将会得到错误。

series_with_nulls = series.copy()
series_with_nulls.loc[1, 'y'] = np.nan
test_fail(
    lambda: ts.fit_transform(series_with_nulls, id_col='unique_id', time_col='ds', target_col='y'),
    contains='y column contains null values'
)
# 未排序的数据框
ts = TimeSeries(**flow_config)
df = ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y')
unordered_series = series.sample(frac=1.0)
assert not unordered_series.set_index('ds', append=True).index.is_monotonic_increasing
df2 = ts.fit_transform(unordered_series, id_col='unique_id', time_col='ds', target_col='y')
pd.testing.assert_frame_equal(
    df.reset_index(drop=True),
    df2.sort_values(['unique_id', 'ds']).reset_index(drop=True)
)
# 现有特征不会重新计算
df_with_features = pd.DataFrame({
    'unique_id': [1, 1, 1],
    'ds': pd.date_range('2000-01-01', freq='D', periods=3),
    'y': [10., 11., 12.],
    'lag1': [1, 1, 1],
    'month': [12, 12, 12],
    
})
ts = TimeSeries(freq='D', lags=[1, 2], date_features=['year', 'month'])
transformed = ts.fit_transform(df_with_features, id_col='unique_id', time_col='ds', target_col='y', dropna=False)
pd.testing.assert_series_equal(transformed['lag1'], df_with_features['lag1'])
pd.testing.assert_series_equal(transformed['month'], df_with_features['month'])
np.testing.assert_array_equal(transformed['year'], 3 * [2000])
np.testing.assert_array_equal(transformed['lag2'].values, [np.nan, np.nan, 10.])
# 非标准自由度
ts = TimeSeries(**flow_config)
df = ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y')
non_std_series = series.reset_index().rename(columns={'unique_id': 'some_id', 'ds': 'timestamp', 'y': 'value'})
non_std_res = ts.fit_transform(
    non_std_series, id_col='some_id', time_col='timestamp', target_col='value', static_features=[]
)
non_std_res = non_std_res.reset_index(drop=True)
pd.testing.assert_frame_equal(
    df.reset_index(),
    non_std_res.rename(columns={'timestamp': 'ds', 'value': 'y', 'some_id': 'unique_id'})
)
# 整数时间戳
def identity(x):
    return x

flow_config_int_ds = copy.deepcopy(flow_config)
flow_config_int_ds['date_features'] = [identity]
flow_config_int_ds['freq'] = 1
ts = TimeSeries(**flow_config_int_ds)
int_ds_series = series.copy()
int_ds_series['ds'] = int_ds_series['ds'].astype('int64')
int_ds_res = ts.fit_transform(int_ds_series, id_col='unique_id', time_col='ds', target_col='y')
int_ds_res['ds'] = pd.to_datetime(int_ds_res['ds'])
int_ds_res['identity'] = pd.to_datetime(int_ds_res['ds'])
df2 = df.drop(columns=flow_config['date_features'])
df2['identity'] = df2['ds']
pd.testing.assert_frame_equal(df2, int_ds_res)
show_doc(TimeSeries.predict, title_level=2)

source

TimeSeries.predict

 TimeSeries.predict (models:Dict[str,Union[sklearn.base.BaseEstimator,List
                     [sklearn.base.BaseEstimator]]], horizon:int,
                     before_predict_callback:Optional[Callable]=None,
                     after_predict_callback:Optional[Callable]=None, X_df:
                     Union[pandas.core.frame.DataFrame,polars.dataframe.fr
                     ame.DataFrame,NoneType]=None,
                     ids:Optional[List[str]]=None)

一旦我们有了训练好的模型,我们可以使用 TimeSeries.predict 方法,传入模型和预测范围,以获取预测结果。

class DummyModel:
    def predict(self, X: pd.DataFrame) -> np.ndarray:
        return X['lag7'].values

horizon = 7
model = DummyModel()
ts = TimeSeries(**flow_config)
ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y')
predictions = ts.predict({'DummyModel': model}, horizon)

grouped_series = series.groupby('unique_id', observed=True)
expected_preds = grouped_series['y'].tail(7)  # 该模型预测了滞后7期的结果。
last_dates = grouped_series['ds'].max()
expected_dsmin = last_dates + pd.offsets.Day()
expected_dsmax = last_dates + horizon * pd.offsets.Day()
grouped_preds = predictions.groupby('unique_id', observed=True)

np.testing.assert_allclose(predictions['DummyModel'], expected_preds)
pd.testing.assert_series_equal(grouped_preds['ds'].min(), expected_dsmin)
pd.testing.assert_series_equal(grouped_preds['ds'].max(), expected_dsmax)
model = DummyModel()
ts = TimeSeries(**flow_config)
ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y')
predictions = ts.predict({'DummyModel': model}, horizon=horizon)
ts = TimeSeries(**flow_config_int_ds)
ts.fit_transform(int_ds_series, id_col='unique_id', time_col='ds', target_col='y')
int_ds_predictions = ts.predict({'DummyModel': model}, horizon=horizon)
pd.testing.assert_frame_equal(predictions.drop(columns='ds'), int_ds_predictions.drop(columns='ds'))

如果我们有动态特征,可以将它们传递给X_df

class PredictPrice:
    def predict(self, X):
        return X['price']

series = generate_daily_series(20, n_static_features=2, equal_ends=True)
dynamic_series = series.rename(columns={'static_1': 'product_id'})
prices_catalog = generate_prices_for_series(dynamic_series)
series_with_prices = dynamic_series.merge(prices_catalog, how='left')

model = PredictPrice()
ts = TimeSeries(**flow_config)
ts.fit_transform(
    series_with_prices,
    id_col='unique_id',
    time_col='ds',
    target_col='y',
    static_features=['static_0', 'product_id'],
)
predictions = ts.predict({'PredictPrice': model}, horizon=1, X_df=prices_catalog)
pd.testing.assert_frame_equal(
    predictions.rename(columns={'PredictPrice': 'price'}),
    prices_catalog.merge(predictions[['unique_id', 'ds']])[['unique_id', 'ds', 'price']]
)
# 预测子集
sample_ids = ['id_00', 'id_16']
sample_preds = ts.predict({'price': model}, 1, X_df=prices_catalog, ids=sample_ids)
pd.testing.assert_frame_equal(
    sample_preds,
    prices_catalog.merge(predictions[predictions['unique_id'].isin(sample_ids)][['unique_id', 'ds']])[['unique_id', 'ds', 'price']]
)
test_fail(lambda: ts.predict({'y': model}, 1, ids=['bonjour']), contains="{'bonjour'}")
show_doc(TimeSeries.update, title_level=2)

source

TimeSeries.update

 TimeSeries.update
                    (df:Union[pandas.core.frame.DataFrame,polars.dataframe
                    .frame.DataFrame])

Update the values of the stored series.

class SeasonalNaiveModel:
    def predict(self, X):
        return X['lag7']

class NaiveModel:
    def predict(self, X: pd.DataFrame):
        return X['lag1']

two_series = series[series['unique_id'].isin(['id_00', 'id_19'])].copy()
two_series['unique_id'] = pd.Categorical(two_series['unique_id'], ['id_00', 'id_19'])
ts = TimeSeries(freq='D', lags=[1], date_features=['dayofweek'])
ts.fit_transform(
    two_series,
    id_col='unique_id',
    time_col='ds',
    target_col='y',
)
last_vals_two_series = two_series.groupby('unique_id', observed=True).tail(1)
last_val_id0 = last_vals_two_series[lambda x: x['unique_id'].eq('id_00')].copy()
new_values = last_val_id0.copy()
new_values['ds'] += pd.offsets.Day()
new_serie = pd.DataFrame({
    'unique_id': ['new_idx', 'new_idx'],
    'ds': pd.to_datetime(['2020-01-01', '2020-01-02']),
    'y': [5.0, 6.0],
    'static_0': [0, 0],
    'static_1': [1, 1],
})
new_values = pd.concat([new_values, new_serie])
ts.update(new_values)
preds = ts.predict({'Naive': NaiveModel()}, 1)
expected_id0 = last_val_id0.copy()
expected_id0['ds'] += pd.offsets.Day()
expected_id1 = last_vals_two_series[lambda x: x['unique_id'].eq('id_19')].copy()
last_val_new_serie = new_serie.tail(1)[['unique_id', 'ds', 'y']]
expected = pd.concat([expected_id0, expected_id1, last_val_new_serie])
expected = expected[['unique_id', 'ds', 'y']]
expected = expected.rename(columns={'y': 'Naive'}).reset_index(drop=True)
expected['unique_id'] = pd.Categorical(expected['unique_id'], categories=['id_00', 'id_19', 'new_idx'])
expected['ds'] += pd.offsets.Day()
pd.testing.assert_frame_equal(preds, expected)
pd.testing.assert_frame_equal(
    ts.static_features_,
    (
        pd.concat([last_vals_two_series, new_serie.tail(1)])
        [['unique_id', 'static_0', 'static_1']]
        .astype(ts.static_features_.dtypes)
        .reset_index(drop=True)
    )
)
# 使用目标变换
ts = TimeSeries(
    freq='D',
    lags=[7],
    target_transforms=[Differences([1, 2]), LocalStandardScaler()],
)
ts.fit_transform(two_series, id_col='unique_id', time_col='ds', target_col='y')
new_values = two_series.groupby('unique_id', observed=True).tail(7).copy()
new_values['ds'] += 7 * pd.offsets.Day()
orig_last7 = ts.ga.take_from_groups(slice(-7, None)).data
ts.update(new_values)
preds = ts.predict({'SeasonalNaive': SeasonalNaiveModel()}, 7)
np.testing.assert_allclose(
    new_values['y'].values,
    preds['SeasonalNaive'].values,
)
last7 = ts.ga.take_from_groups(slice(-7, None)).data
assert 0 < np.abs(last7 / orig_last7 - 1).mean() < 0.5
#| 极地
two_series = generate_daily_series(2, n_static_features=2, engine='polars')
ts = TimeSeries(freq='1d', lags=[1], date_features=['weekday'])
ts.fit_transform(
    two_series,
    id_col='unique_id',
    time_col='ds',
    target_col='y',
)
last_vals_two_series = two_series.join(
    two_series.group_by('unique_id').agg(pl.col('ds').max()), on=['unique_id', 'ds']
)
last_val_id0 = last_vals_two_series.filter(pl.col('unique_id') == 'id_0')
new_values = last_val_id0.with_columns(
    pl.col('unique_id').cast(pl.Categorical),
    pl.col('ds').dt.offset_by('1d'),
    pl.col('static_0').cast(pl.Int64),
    pl.col('static_1').cast(pl.Int64),
)
new_serie = pl.DataFrame({
    'unique_id': ['new_idx', 'new_idx'],
    'ds': [datetime.datetime(2020, 1, 1), datetime.datetime(2020, 1, 2)],
    'y': [5.0, 6.0],
    'static_0': [0, 0],
    'static_1': [1, 1],
}).with_columns(
    pl.col('ds').dt.cast_time_unit('ns'),
    pl.col('unique_id').cast(pl.Categorical),
)
new_values = pl.concat([new_values, new_serie])
ts.update(new_values)
preds = ts.predict({'Naive': NaiveModel()}, 1)
expected_id0 = last_val_id0.with_columns(pl.col('ds').dt.offset_by('1d'))
expected_id1 = last_vals_two_series.filter(pl.col('unique_id') == 'id_1')
last_val_new_serie = new_serie.tail(1)
expected = pl.concat([expected_id0, expected_id1])
expected = ufp.vertical_concat([expected, last_val_new_serie])
pd.testing.assert_series_equal(
    expected['unique_id'].cat.get_categories().to_pandas(),
    pd.Series(['id_0', 'id_1', 'new_idx'], name='unique_id')
)
expected = expected[['unique_id', 'ds', 'y']]
expected = ufp.rename(expected, {'y': 'Naive'})
expected = expected.with_columns(pl.col('ds').dt.offset_by('1d'))
pd.testing.assert_frame_equal(preds.to_pandas(), expected.to_pandas())
pd.testing.assert_frame_equal(
    ts.static_features_.to_pandas(),
    (
        ufp.vertical_concat([last_vals_two_series, new_serie.tail(1)])
        [['unique_id', 'static_0', 'static_1']]
        .to_pandas()
        .astype(ts.static_features_.to_pandas().dtypes)
        .reset_index(drop=True)
    )
)
# 使用目标变换
ts = TimeSeries(
    freq='1d',
    lags=[7],
    target_transforms=[Differences([1, 2]), LocalStandardScaler()],
)
ts.fit_transform(two_series, id_col='unique_id', time_col='ds', target_col='y')
new_values = two_series.group_by('unique_id').tail(7)
new_values = new_values.with_columns(pl.col('ds').dt.offset_by('7d'))
orig_last7 = ts.ga.take_from_groups(slice(-7, None)).data
ts.update(new_values)
preds = ts.predict({'SeasonalNaive': SeasonalNaiveModel()}, 7)
np.testing.assert_allclose(
    new_values['y'].to_numpy(),
    preds['SeasonalNaive'].to_numpy(),
)
last7 = ts.ga.take_from_groups(slice(-7, None)).data
assert 0 < np.abs(last7 / orig_last7 - 1).mean() < 0.5
sys:1: CategoricalRemappingWarning: Local categoricals have different encodings, expensive re-encoding is done to perform this merge operation. Consider using a StringCache or an Enum type if the categories are known in advance
# target_transform 保留最后 n 个
ts = TimeSeries(freq='D', lags=[1], target_transforms=[LocalStandardScaler()])
ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y', keep_last_n=10)
preds = ts.predict({'y': NaiveModel()}, 1)
expected = series.groupby('unique_id', observed=True).tail(1)[['unique_id', 'ds', 'y']].reset_index(drop=True)
expected['ds'] += pd.offsets.Day()
pd.testing.assert_frame_equal(preds, expected)
# 在预测时,如果省略了`static_features`参数,而将其作为动态特征传递,则引发错误。
valid = series.groupby('unique_id', observed=True).tail(10)
train = series.drop(valid.index)
ts = TimeSeries(freq='D', lags=[1], target_transforms=[LocalStandardScaler()])
ts.fit_transform(train, id_col='unique_id', time_col='ds', target_col='y', keep_last_n=10)
test_fail(lambda: ts.predict({'y': NaiveModel()}, 1, X_df=valid.drop(columns=['y'])), contains="['static_0', 'static_1']")
#| 极地
series_pl = generate_daily_series(5, static_as_categorical=False, n_static_features=5, engine='polars')
series_pd = generate_daily_series(5, static_as_categorical=False, n_static_features=5, engine='pandas')
series_pl = series_pl.with_columns(pl.col('unique_id').cast(str))
series_pd['unique_id'] = series_pd['unique_id'].astype(str)

cfg = dict(
    lags=[1, 2, 3, 4],
    lag_transforms={
        1: [expanding_mean, (rolling_mean, 7), (rolling_mean, 14)],
        2: [expanding_mean, (rolling_mean, 7), (rolling_mean, 14)],
        3: [expanding_mean, (rolling_mean, 7), (rolling_mean, 14)],
        4: [expanding_mean, (rolling_mean, 7), (rolling_mean, 14)],
    },
    date_features=['day', 'month', 'quarter', 'year'],
    target_transforms=[Differences([1])],
)
feats_pl = SaveFeatures()
ts_pl = TimeSeries(freq='1d', **cfg)
prep_pl = ts_pl.fit_transform(series_pl, 'unique_id', 'ds', 'y')
fcst_pl = ts_pl.predict({'y': NaiveModel()}, 2, before_predict_callback=feats_pl)

feats_pd = SaveFeatures()
ts_pd = TimeSeries(freq='1D', **cfg)
prep_pd = ts_pd.fit_transform(series_pd, 'unique_id', 'ds', 'y')
fcst_pd = ts_pd.predict({'y': NaiveModel()}, 2, before_predict_callback=feats_pd)

prep_pd = prep_pd.reset_index(drop=True)
prep_pl = prep_pl.to_pandas()
fcst_pl = fcst_pl.to_pandas()
# 日期特征具有不同的数据类型
pd.testing.assert_frame_equal(prep_pl, prep_pd, check_dtype=False)
pd.testing.assert_frame_equal(
    feats_pl.get_features(with_step=True).to_pandas(),
    feats_pd.get_features(with_step=True),
    check_dtype=False,
)
pd.testing.assert_frame_equal(fcst_pl, fcst_pd)
# 被砍掉的剧集
for ordered in [True, False]:
    series = generate_daily_series(10, min_length=5, max_length=20)
    if not ordered:
        series = series.sample(frac=1.0, random_state=40)
    ts = TimeSeries(freq='D', lags=[10])
    with warnings.catch_warnings(record=True):
        prep = ts.fit_transform(series, 'unique_id', 'ds', 'y')
    dropped = ts.uids[ts._dropped_series].tolist()
    assert not prep['unique_id'].isin(dropped).any()
    assert set(prep['unique_id'].unique().tolist() + dropped) == set(series['unique_id'].unique())
# 短系列例外
series = generate_daily_series(2, min_length=5, max_length=15)
ts = TimeSeries(freq='D', lags=[1], target_transforms=[Differences([20])])
test_fail(
    lambda: ts.fit_transform(series, 'unique_id', 'ds', 'y'),
    contains="are too short for the 'Differences' transformation"
)
# 测试预测
class Lag1PlusOneModel:
    def predict(self, X):
        return X['lag1'] + 1

ts = TimeSeries(freq='D', lags=[1])
for max_horizon in [None, 2]:
    if max_horizon is None:
        mod1 = Lag1PlusOneModel()
        mod2 = Lag1PlusOneModel()
    else:
        mod1 = [Lag1PlusOneModel() for _ in range(max_horizon)]
        mod2 = [Lag1PlusOneModel() for _ in range(max_horizon)]
    ts.fit_transform(train, 'unique_id', 'ds', 'y', max_horizon=max_horizon)
    # 每个模型都获得了正确的历史值
    preds = ts.predict(models={'mod1': mod1, 'mod2': mod2}, horizon=2)
    np.testing.assert_allclose(preds['mod1'], preds['mod2'])
    # 幂等性
    preds2 = ts.predict(models={'mod1': mod1, 'mod2': mod2}, horizon=2)
    np.testing.assert_allclose(preds2['mod1'], preds2['mod2'])
    pd.testing.assert_frame_equal(preds, preds2)
# 保存与加载
series = generate_daily_series(2, n_static_features=2)
ts = TimeSeries(
    freq='D',
    lags=[1, 2],
    date_features=['dayofweek'],
    lag_transforms={
        1: [RollingMean(1)]
    },
    target_transforms=[Differences([20])],
)
ts.fit_transform(series, 'unique_id', 'ds', 'y')
with tempfile.TemporaryDirectory() as tmpdir:
    fname = Path(tmpdir) / 'hi'
    ts.save(fname)
    ts2 = TimeSeries.load(fname)
preds = ts.predict({'model': NaiveModel()}, 10)
preds2 = ts2.predict({'model': NaiveModel()}, 10)
pd.testing.assert_frame_equal(preds, preds2)

Give us a ⭐ on Github