%load_ext autoreload
%autoreload 2
核心
import copy
import inspect
import reprlib
import warnings
from collections import Counter, OrderedDict
from contextlib import contextmanager
from pathlib import Path
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Mapping,
Optional,
Tuple,
Union,
)
import cloudpickle
import fsspec
import numpy as np
import pandas as pd
import utilsforecast.processing as ufp
from sklearn.base import BaseEstimator, clone
from sklearn.pipeline import Pipeline
from utilsforecast.compat import (
DFType,
DataFrame,
pl,
pl_DataFrame,
pl_Series,
)from utilsforecast.validation import validate_format, validate_freq
from mlforecast.grouped_array import GroupedArray
from mlforecast.lag_transforms import _BaseLagTransform, Lag
from mlforecast.target_transforms import (
_BaseGroupedArrayTargetTransform,
BaseTargetTransform,
)from mlforecast.utils import _ShortSeriesException, _ensure_shallow_copy
import datetime
import tempfile
from nbdev import show_doc
from fastcore.test import test_eq, test_fail, test_warns
from window_ops.expanding import expanding_mean
from window_ops.rolling import rolling_mean
from window_ops.shift import shift_array
from mlforecast.callbacks import SaveFeatures
from mlforecast.lag_transforms import ExpandingMean, RollingMean
from mlforecast.target_transforms import Differences, LocalStandardScaler
from mlforecast.utils import generate_daily_series, generate_prices_for_series
数据格式
所需的输入格式是一个数据框,至少包含以下列: * unique_id
用于每个时间序列的唯一标识符 * ds
用于日期戳的列 * y
用于序列的值。
除非在 TimeSeries.fit
中另有说明,否则所有其他列都被视为静态特征。
= generate_daily_series(20, n_static_features=2)
series series
unique_id | ds | y | static_0 | static_1 | |
---|---|---|---|---|---|
0 | id_00 | 2000-01-01 | 7.404529 | 27 | 53 |
1 | id_00 | 2000-01-02 | 35.952624 | 27 | 53 |
2 | id_00 | 2000-01-03 | 68.958353 | 27 | 53 |
3 | id_00 | 2000-01-04 | 84.994505 | 27 | 53 |
4 | id_00 | 2000-01-05 | 113.219810 | 27 | 53 |
... | ... | ... | ... | ... | ... |
4869 | id_19 | 2000-03-25 | 400.606807 | 97 | 45 |
4870 | id_19 | 2000-03-26 | 538.794824 | 97 | 45 |
4871 | id_19 | 2000-03-27 | 620.202104 | 97 | 45 |
4872 | id_19 | 2000-03-28 | 20.625426 | 97 | 45 |
4873 | id_19 | 2000-03-29 | 141.513169 | 97 | 45 |
4874 rows × 5 columns
为了简单起见,我们这里只考虑一个时间序列。
= series['unique_id'].unique()
uids = series[series['unique_id'].eq(uids[0])]
serie serie
unique_id | ds | y | static_0 | static_1 | |
---|---|---|---|---|---|
0 | id_00 | 2000-01-01 | 7.404529 | 27 | 53 |
1 | id_00 | 2000-01-02 | 35.952624 | 27 | 53 |
2 | id_00 | 2000-01-03 | 68.958353 | 27 | 53 |
3 | id_00 | 2000-01-04 | 84.994505 | 27 | 53 |
4 | id_00 | 2000-01-05 | 113.219810 | 27 | 53 |
... | ... | ... | ... | ... | ... |
217 | id_00 | 2000-08-05 | 13.263188 | 27 | 53 |
218 | id_00 | 2000-08-06 | 38.231981 | 27 | 53 |
219 | id_00 | 2000-08-07 | 59.555183 | 27 | 53 |
220 | id_00 | 2000-08-08 | 86.986368 | 27 | 53 |
221 | id_00 | 2000-08-09 | 119.254810 | 27 | 53 |
222 rows × 5 columns
= {
date_features_dtypes "year": np.uint16,
"month": np.uint8,
"day": np.uint8,
"hour": np.uint8,
"minute": np.uint8,
"second": np.uint8,
"dayofyear": np.uint16,
"day_of_year": np.uint16,
"weekofyear": np.uint8,
"week": np.uint8,
"dayofweek": np.uint8,
"day_of_week": np.uint8,
"weekday": np.uint8,
"quarter": np.uint8,
"daysinmonth": np.uint8,
"is_month_start": np.uint8,
"is_month_end": np.uint8,
"is_quarter_start": np.uint8,
"is_quarter_end": np.uint8,
"is_year_start": np.uint8,
"is_year_end": np.uint8,
}
def _build_function_transform_name(tfm: Callable, lag: int, *args) -> str:
"""基于`lag`、函数名称及其参数,创建一个转换的名称。"""
= f"{tfm.__name__}_lag{lag}"
tfm_name = inspect.signature(tfm).parameters
func_params = list(func_params.items())[1:] # 移除输入数组参数
func_args = [
changed_params f"{name}{value}"
for value, (name, arg) in zip(args, func_args)
if arg.default != value
]if changed_params:
+= "_" + "_".join(changed_params)
tfm_name return tfm_name
1), 'expanding_mean_lag1')
test_eq(_build_function_transform_name(expanding_mean, 2, 7), 'rolling_mean_lag2_window_size7') test_eq(_build_function_transform_name(rolling_mean,
def _build_lag_transform_name(tfm: _BaseLagTransform, lag: int) -> str:
return tfm._get_name(lag)
1), 'expanding_mean_lag1')
test_eq(_build_lag_transform_name(ExpandingMean(), 7), 2), 'rolling_mean_lag2_window_size7') test_eq(_build_lag_transform_name(RollingMean(
def _build_transform_name(
int, *args
tfm: Union[Callable, _BaseLagTransform], lag: -> str:
) if callable(tfm):
= _build_function_transform_name(tfm, lag, *args)
name else:
= _build_lag_transform_name(tfm, lag)
name return name
def _get_model_name(model) -> str:
if isinstance(model, Pipeline):
return _get_model_name(model.steps[-1][1])
return model.__class__.__name__
def _name_models(current_names):
= Counter(current_names)
ctr if not ctr:
return []
if max(ctr.values()) < 2:
return current_names
= current_names.copy()
names for i, x in enumerate(reversed(current_names), start=1):
= ctr[x]
count if count > 1:
= f"{x}{count}"
name -= 1
ctr[x] else:
= x
name -i] = name
names[return names
# 一个重复项
= ['a', 'b', 'a', 'c']
names = ['a', 'b', 'a2', 'c']
expected = _name_models(names)
actual assert actual == expected
# 无重复项
= ['a', 'b', 'c']
names = _name_models(names)
actual assert actual == names
def _as_tuple(x):
"""从输入中返回一个元组。"""
if isinstance(x, tuple):
return x
return (x,)
= Union[int, str]
Freq = Iterable[int]
Lags = Union[Callable, Tuple[Callable, Any]]
LagTransform = Dict[int, List[LagTransform]]
LagTransforms = Union[str, Callable]
DateFeature = Union[BaseEstimator, List[BaseEstimator], Dict[str, BaseEstimator]]
Models = Union[BaseTargetTransform, _BaseGroupedArrayTargetTransform]
TargetTransform = Dict[str, Union[Tuple[Any, ...], _BaseLagTransform]] Transforms
def _parse_transforms(
lags: Lags,
lag_transforms: LagTransforms,= None,
namer: Optional[Callable] -> Transforms:
) = OrderedDict()
transforms: Transforms if namer is None:
= _build_transform_name
namer for lag in lags:
f'lag{lag}'] = Lag(lag)
transforms[for lag in lag_transforms.keys():
for tfm in lag_transforms[lag]:
if isinstance(tfm, _BaseLagTransform):
= namer(tfm, lag)
tfm_name = clone(tfm)._set_core_tfm(lag)
transforms[tfm_name] else:
*args = _as_tuple(tfm)
tfm, assert callable(tfm)
= namer(tfm, lag, *args)
tfm_name = (lag, tfm, *args)
transforms[tfm_name] return transforms
class TimeSeries:
"""用于存储和转换时间序列数据的实用类。"""
def __init__(
self,
freq: Freq,= None,
lags: Optional[Lags] = None,
lag_transforms: Optional[LagTransforms] = None,
date_features: Optional[Iterable[DateFeature]] int = 1,
num_threads: = None,
target_transforms: Optional[List[TargetTransform]] = None,
lag_transforms_namer: Optional[Callable]
):self.freq = freq
if not isinstance(num_threads, int) or num_threads < 1:
'Setting num_threads to 1.')
warnings.warn(= 1
num_threads self.lags = [] if lags is None else list(lags)
for lag in self.lags:
if lag <= 0 or not isinstance(lag, int):
raise ValueError('lags must be positive integers.')
self.lag_transforms = {} if lag_transforms is None else lag_transforms
for lag in self.lag_transforms.keys():
if lag <= 0 or not isinstance(lag, int):
raise ValueError('keys of lag_transforms must be positive integers.')
self.date_features = [] if date_features is None else list(date_features)
self.num_threads = num_threads
self.target_transforms = target_transforms
if self.target_transforms is not None:
for tfm in self.target_transforms:
if isinstance(tfm, _BaseGroupedArrayTargetTransform):
tfm.set_num_threads(num_threads)for feature in self.date_features:
if callable(feature) and feature.__name__ == '<lambda>':
raise ValueError(
"Can't use a lambda as a date feature because the function name gets used as the feature name."
)self.lag_transforms_namer = lag_transforms_namer
self.transforms = _parse_transforms(
=self.lags, lag_transforms=self.lag_transforms, namer=lag_transforms_namer
lags
)self.ga: GroupedArray
def _get_core_lag_tfms(self) -> Dict[str, _BaseLagTransform]:
return {k: v for k, v in self.transforms.items() if isinstance(v, _BaseLagTransform)}
@property
def _date_feature_names(self):
return [f.__name__ if callable(f) else f for f in self.date_features]
@property
def features(self) -> List[str]:
"""所有计算特征的名称。"""
return list(self.transforms.keys()) + self._date_feature_names
def __repr__(self):
return (
f"TimeSeries(freq={self.freq}, "
f"transforms={list(self.transforms.keys())}, "
f"date_features={self._date_feature_names}, "
f"num_threads={self.num_threads})"
)
def _fit(
self,
df: DataFrame,str,
id_col: str,
time_col: str,
target_col: str]] = None,
static_features: Optional[List[int] = None,
keep_last_n: Optional[-> 'TimeSeries':
) """保存系列值、ID和最后日期。"""
validate_format(df, id_col, time_col, target_col)self.freq)
validate_freq(df[time_col], if ufp.is_nan_or_none(df[target_col]).any():
raise ValueError(f'{target_col} column contains null values.')
self.id_col = id_col
self.target_col = target_col
self.time_col = time_col
self.keep_last_n = keep_last_n
self.static_features = static_features
= df[[id_col, time_col, target_col]]
sorted_df = ufp.copy_if_pandas(sorted_df, deep=False)
sorted_df self._sort_idxs = ufp.process_df(
uids, times, data, indptr, =sorted_df,
df=id_col,
id_col=time_col,
time_col=target_col,
target_col
)if data.ndim == 2:
= data[:, 0]
data = GroupedArray(data, indptr)
ga if isinstance(df, pd.DataFrame):
self.uids = pd.Index(uids)
self.last_dates = pd.Index(times)
else:
self.uids = uids
self.last_dates = pl_Series(times)
if self._sort_idxs is not None:
self._restore_idxs: Optional[np.ndarray] = np.empty(df.shape[0], dtype=np.int32)
self._restore_idxs[self._sort_idxs] = np.arange(df.shape[0])
= ufp.take_rows(sorted_df, self._sort_idxs)
sorted_df else:
self._restore_idxs = None
if self.target_transforms is not None:
for tfm in self.target_transforms:
if isinstance(tfm, _BaseGroupedArrayTargetTransform):
try:
= tfm.fit_transform(ga)
ga except _ShortSeriesException as exc:
= tfm.__class__.__name__
tfm_name = reprlib.repr(list(self.uids[exc.args]))
uids raise ValueError(
f"The following series are too short for the '{tfm_name}' transformation: {uids}."
from None
) = ufp.assign_columns(sorted_df, target_col, ga.data)
sorted_df else:
tfm.set_column_names(id_col, time_col, target_col)= tfm.fit_transform(sorted_df)
sorted_df = sorted_df[target_col].to_numpy()
ga.data = [id_col, time_col, target_col]
to_drop if static_features is None:
= [c for c in df.columns if c not in [time_col, target_col]]
static_features elif id_col not in static_features:
= [id_col] + static_features
static_features else: # static_features defined and contain id_col
= [time_col, target_col]
to_drop self.ga = ga
= ga.indptr[:-1]
series_starts = ga.indptr[1:] - 1
series_ends if self._sort_idxs is not None:
= self._sort_idxs[series_starts]
series_starts = self._sort_idxs[series_ends]
series_ends = ufp.drop_index_if_pandas(
statics_on_starts
ufp.take_rows(df, series_starts)[static_features]
)= ufp.drop_index_if_pandas(
statics_on_ends
ufp.take_rows(df, series_ends)[static_features]
)for feat in static_features:
if (statics_on_starts[feat] != statics_on_ends[feat]).any():
raise ValueError(
f'{feat} is declared as a static feature but its values change '
'over time. Please set the `static_features` argument to '
'indicate which features are static.\nIf all of your features '
'are dynamic please set `static_features=[]`.'
)self.static_features_ = statics_on_ends
self.features_order_ = [c for c in df.columns if c not in to_drop] + self.features
return self
def _compute_transforms(
self,
str, Union[Tuple[Any, ...], _BaseLagTransform]],
transforms: Mapping[bool,
updates_only: -> Dict[str, np.ndarray]:
) """计算构造函数中定义的变换。
如果 `self.num_threads > 1`,这些计算将使用多线程进行。"""
if self.num_threads == 1 or len(transforms) == 1:
= self.ga.apply_transforms(
out =transforms, updates_only=updates_only
transforms
)else:
= self.ga.apply_multithreaded_transforms(
out =transforms,
transforms=self.num_threads,
num_threads=updates_only,
updates_only
)return out
def _compute_date_feature(self, dates, feature):
if callable(feature):
= feature.__name__
feat_name = feature(dates)
feat_vals else:
= feature
feat_name if isinstance(dates, pd.DatetimeIndex):
if feature in ('week', 'weekofyear'):
= dates.isocalendar()
dates = getattr(dates, feature)
feat_vals else:
= getattr(dates.dt, feature)()
feat_vals if isinstance(feat_vals, (pd.Index, pd.Series)):
= np.asarray(feat_vals)
feat_vals = date_features_dtypes.get(feature)
feat_dtype if feat_dtype is not None:
= feat_vals.astype(feat_dtype)
feat_vals return feat_name, feat_vals
def _transform(
self,
df: DFType,bool = True,
dropna: int] = None,
max_horizon: Optional[bool = False,
return_X_y: bool = False,
as_numpy: -> DFType:
) """将这些功能添加到 `df` 中。
如果 `dropna=True`,则所有空值行都会被删除。"""
= {k: v for k, v in self.transforms.items() if k not in df}
transforms = self._compute_transforms(transforms=transforms, updates_only=False)
features if self._restore_idxs is not None:
for k, v in features.items():
= v[self._restore_idxs]
features[k]
# target
self.max_horizon = max_horizon
if max_horizon is None:
= self.ga.data
target else:
= self.ga.expand_target(max_horizon)
target if self._restore_idxs is not None:
= target[self._restore_idxs]
target
# determine rows to keep
if dropna:
= np.full(df.shape[0], False)
feature_nulls for feature_vals in features.values():
|= np.isnan(feature_vals)
feature_nulls = np.isnan(target)
target_nulls if target_nulls.ndim == 2:
# target nulls for each horizon are dropped in MLForecast.fit_models
# we just drop rows here for which all the target values are null
= target_nulls.all(axis=1)
target_nulls = ~(feature_nulls | target_nulls)
keep_rows for k, v in features.items():
= v[keep_rows]
features[k] = target[keep_rows]
target = ufp.filter_with_mask(df, keep_rows)
df = ufp.copy_if_pandas(df, deep=False)
df = self.ga.indptr[1:] - 1
last_idxs if self._sort_idxs is not None:
= self._sort_idxs[last_idxs]
last_idxs = ~keep_rows[last_idxs]
last_vals_nan if last_vals_nan.any():
self._dropped_series: Optional[np.ndarray] = np.where(last_vals_nan)[0]
= reprlib.repr(list(self.uids[self._dropped_series]))
dropped_ids
warnings.warn("The following series were dropped completely "
f"due to the transformations and features: {dropped_ids}.\n"
"These series won't show up if you use `MLForecast.forecast_fitted_values()`.\n"
"You can set `dropna=False` or use transformations that require less samples to mitigate this"
)else:
self._dropped_series = None
elif isinstance(df, pd.DataFrame):
= df.copy(deep=False)
df self._dropped_series = None
# once we've computed the features and target we can slice the series
if self.keep_last_n is not None:
self.ga = self.ga.take_from_groups(slice(-self.keep_last_n, None))
del self._restore_idxs, self._sort_idxs
# lag transforms
for feat in transforms.keys():
= ufp.assign_columns(df, feat, features[feat])
df
# date features
= [f.__name__ if callable(f) else f for f in self.date_features]
names = [f for f, name in zip(self.date_features, names) if name not in df]
date_features if date_features:
= df[self.time_col].unique()
unique_dates if isinstance(df, pd.DataFrame):
# all kinds of trickery to make this fast
= pd.Index(unique_dates)
unique_dates = {date: i for i, date in enumerate(unique_dates)}
date2pos = df[self.time_col].map(date2pos)
restore_idxs for feature in date_features:
= self._compute_date_feature(unique_dates, feature)
feat_name, feat_vals = feat_vals[restore_idxs]
df[feat_name] elif isinstance(df, pl_DataFrame):
= []
exprs for feat in date_features: # type: ignore
= self._compute_date_feature(pl.col(self.time_col), feat)
name, vals
exprs.append(vals.alias(name))= unique_dates.to_frame().with_columns(*exprs)
feats = df.join(feats, on=self.time_col, how='left')
df
# assemble return
if return_X_y:
= df[self.features_order_]
X if as_numpy:
= ufp.to_numpy(X)
X return X, target
if max_horizon is not None:
# remove original target
= [c for c in df.columns if c != self.target_col]
out_cols = df[out_cols]
df = [f"{self.target_col}{i}" for i in range(max_horizon)]
target_names = ufp.assign_columns(df, target_names, target)
df else:
if isinstance(df, pd.DataFrame):
= _ensure_shallow_copy(df)
df = ufp.assign_columns(df, self.target_col, target)
df return df
def fit_transform(
self,
data: DFType,str,
id_col: str,
time_col: str,
target_col: str]] = None,
static_features: Optional[List[bool = True,
dropna: int] = None,
keep_last_n: Optional[int] = None,
max_horizon: Optional[bool = False,
return_X_y: bool = False,
as_numpy: -> Union[DFType, Tuple[DFType, np.ndarray]]:
) """Add the features to `data` and save the required information for the predictions step.
If not all features are static, specify which ones are in `static_features`.
If you don't want to drop rows with null values after the transformations set `dropna=False`
If `keep_last_n` is not None then that number of observations is kept across all series for updates.
"""
self.dropna = dropna
self.as_numpy = as_numpy
self._fit(
=data,
df=id_col,
id_col=time_col,
time_col=target_col,
target_col=static_features,
static_features=keep_last_n,
keep_last_n
)return self._transform(
=data,
df=dropna,
dropna=max_horizon,
max_horizon=return_X_y,
return_X_y=as_numpy,
as_numpy
)
def _update_y(self, new: np.ndarray) -> None:
"""将`new`中的元素附加到每个时间序列中。
这些值用于更新转换,并作为预测结果存储。"""
if not hasattr(self, 'y_pred'):
self.y_pred = []
self.y_pred.append(new)
= np.asarray(new)
new_arr self.ga = self.ga.append(new_arr)
def _update_features(self) -> DataFrame:
"""使用时间序列的最新值计算所有特征的当前值。"""
self.curr_dates: Union[pd.Index, pl_Series] = ufp.offset_times(self.curr_dates, self.freq, 1)
self.test_dates.append(self.curr_dates)
= self._compute_transforms(self.transforms, updates_only=True)
features
for feature in self.date_features:
= self._compute_date_feature(self.curr_dates, feature)
feat_name, feat_vals = feat_vals
features[feat_name]
if isinstance(self.last_dates, pl_Series):
= pl_DataFrame
df_constructor else:
= pd.DataFrame
df_constructor = df_constructor(features)[self.features]
features_df return ufp.horizontal_concat([self.static_features_, features_df])
def _get_raw_predictions(self) -> np.ndarray:
return np.array(self.y_pred).ravel('F')
def _get_future_ids(self, h: int):
if isinstance(self.uids, pl_Series):
= pl.concat([self.uids for _ in range(h)]).sort()
uids else:
= pd.Series(
uids self.uids, h), name=self.id_col, dtype=self.uids.dtype
np.repeat(
)return uids
def _get_predictions(self) -> DataFrame:
"""获取所有预测值及其对应的ID和时间戳。"""
= len(self.y_pred)
h if isinstance(self.uids, pl_Series):
= pl_DataFrame
df_constructor else:
= pd.DataFrame
df_constructor = self._get_future_ids(h)
uids = df_constructor(
df
{self.id_col: uids,
self.time_col: np.array(self.test_dates).ravel('F'),
f'{self.target_col}_pred': self._get_raw_predictions(),
},
)return df
def _get_features_for_next_step(self, X_df=None):
= self._update_features()
new_x if X_df is not None:
= len(self.uids)
n_series = X_df.shape[0] // n_series
h = np.arange(self._h, X_df.shape[0], h)
rows = ufp.take_rows(X_df, rows)
X = ufp.drop_index_if_pandas(X)
X = ufp.horizontal_concat([new_x, X])
new_x if isinstance(new_x, pd.DataFrame):
= new_x.isnull().any()
nulls = nulls[nulls].index.tolist()
cols_with_nulls else:
= new_x.select(pl.all().is_null().any())
nulls = [k for k, v in nulls.to_dicts()[0].items() if v]
cols_with_nulls if cols_with_nulls:
warnings.warn(f'Found null values in {", ".join(cols_with_nulls)}.'
)self._h += 1
= new_x[self.features_order_]
new_x if self.as_numpy:
= ufp.to_numpy(new_x)
new_x return new_x
@contextmanager
def _backup(self) -> Iterator[None]:
# 这会在预测过程中被修改,因为预测结果会被追加。
= copy.copy(self.ga)
ga # if these save state (like ExpandingMean) they'll get modified by the updates
= copy.deepcopy(self.transforms)
lag_tfms try:
yield
finally:
self.ga = ga
self.transforms = lag_tfms
def _predict_setup(self) -> None:
# TODO: move to utils
if isinstance(self.last_dates, pl_Series):
self.curr_dates = self.last_dates.clone()
else:
self.curr_dates = self.last_dates.copy()
self.test_dates: List[Union[pd.Index, pl_Series]] = []
self.y_pred = []
self._h = 0
def _predict_recursive(
self,
str, BaseEstimator],
models: Dict[int,
horizon: = None,
before_predict_callback: Optional[Callable] = None,
after_predict_callback: Optional[Callable] = None,
X_df: Optional[DFType] -> DFType:
) """使用 `model` 预测未来 `horizon` 个时间步。"""
for i, (name, model) in enumerate(models.items()):
with self._backup():
self._predict_setup()
for _ in range(horizon):
= self._get_features_for_next_step(X_df)
new_x if before_predict_callback is not None:
= before_predict_callback(new_x)
new_x = model.predict(new_x)
predictions if after_predict_callback is not None:
= after_predict_callback(predictions)
predictions self._update_y(predictions)
if i == 0:
= self._get_predictions()
preds = {f'{self.target_col}_pred': name}
rename_dict = ufp.rename(preds, rename_dict)
preds else:
= self._get_raw_predictions()
raw_preds = ufp.assign_columns(preds, name, raw_preds)
preds return preds
def _predict_multi(
self,
str, BaseEstimator],
models: Dict[int,
horizon: = None,
before_predict_callback: Optional[Callable] = None,
X_df: Optional[DFType] -> DFType:
) assert self.max_horizon is not None
if horizon > self.max_horizon:
raise ValueError(f'horizon must be at most max_horizon ({self.max_horizon})')
self._predict_setup()
= self._get_future_ids(horizon)
uids = ufp.offset_times(self.curr_dates, self.freq, 1)
starts = ufp.time_ranges(starts, self.freq, periods=horizon)
dates if isinstance(self.curr_dates, pl_Series):
= pl_DataFrame
df_constructor else:
= pd.DataFrame
df_constructor = df_constructor({self.id_col: uids, self.time_col: dates})
result for name, model in models.items():
with self._backup():
= self._get_features_for_next_step(X_df)
new_x if before_predict_callback is not None:
= before_predict_callback(new_x)
new_x = np.empty((new_x.shape[0], horizon))
predictions for i in range(horizon):
= model[i].predict(new_x)
predictions[:, i] = predictions.ravel()
raw_preds = ufp.assign_columns(result, name, raw_preds)
result return result
def _has_ga_target_tfms(self):
return any(isinstance(tfm, _BaseGroupedArrayTargetTransform) for tfm in self.target_transforms)
@contextmanager
def _maybe_subset(self, idxs: Optional[np.ndarray]) -> Iterator[None]:
# save original
= self.ga
ga = self.uids
uids = self.static_features_
statics = self.last_dates
last_dates = copy.copy(self.target_transforms)
targ_tfms = copy.deepcopy(self.transforms)
lag_tfms
if idxs is not None:
# assign subsets
self.ga = self.ga.take(idxs)
self.uids = uids[idxs]
self.static_features_ = ufp.take_rows(statics, idxs)
self.static_features_ = ufp.drop_index_if_pandas(self.static_features_)
self.last_dates = last_dates[idxs]
if self.target_transforms is not None:
for i, tfm in enumerate(self.target_transforms):
if isinstance(tfm, _BaseGroupedArrayTargetTransform):
self.target_transforms[i] = tfm.take(idxs)
for name, lag_tfm in self.transforms.items():
if isinstance(lag_tfm, _BaseLagTransform):
= lag_tfm.take(idxs)
lag_tfm self.transforms[name] = lag_tfm
try:
yield
finally:
self.ga = ga
self.uids = uids
self.static_features_ = statics
self.last_dates = last_dates
self.target_transforms = targ_tfms
self.lag_tfms = lag_tfms
def predict(
self,
str, Union[BaseEstimator, List[BaseEstimator]]],
models: Dict[int,
horizon: = None,
before_predict_callback: Optional[Callable] = None,
after_predict_callback: Optional[Callable] = None,
X_df: Optional[DFType] str]] = None,
ids: Optional[List[-> DFType:
) if ids is not None:
= set(ids) - set(self.uids)
unseen if unseen:
raise ValueError(f"The following ids weren't seen during training and thus can't be forecasted: {unseen}")
= np.where(ufp.is_in(self.uids, ids))[0]
idxs: Optional[np.ndarray] else:
= None
idxs with self._maybe_subset(idxs):
if X_df is not None:
if self.id_col not in X_df or self.time_col not in X_df:
raise ValueError(f"X_df must have '{self.id_col}' and '{self.time_col}' columns.")
if X_df.shape[1] < 3:
raise ValueError("Found no exogenous features in `X_df`.")
= [c for c in self.static_features_.columns if c != self.id_col]
statics = [c for c in X_df.columns if c not in [self.id_col, self.time_col]]
dynamics = [c for c in dynamics if c in statics]
common if common:
raise ValueError(
f"The following features were provided through `X_df` but were considered as static during fit: {common}.\n"
"Please re-run the fit step using the `static_features` argument to indicate which features are static. "
"If all your features are dynamic please pass an empty list (static_features=[])."
)= ufp.offset_times(self.last_dates, self.freq, 1)
starts = ufp.offset_times(self.last_dates, self.freq, horizon)
ends = type(X_df)(
dates_validation
{self.id_col: self.uids,
'_start': starts,
'_end': ends,
}
)= ufp.join(X_df, dates_validation, on=self.id_col)
X_df = ufp.between(X_df[self.time_col], X_df['_start'], X_df['_end'])
mask = ufp.filter_with_mask(X_df, mask)
X_df if X_df.shape[0] != len(self.uids) * horizon:
= (
msg "Found missing inputs in X_df. "
"It should have one row per id and time for the complete forecasting horizon.\n"
"You can get the expected structure by running `MLForecast.make_future_dataframe(h)` "
"or get the missing combinatins in your current `X_df` by running `MLForecast.get_missing_future(h, X_df)`."
) raise ValueError(msg)
= [self.id_col, self.time_col, '_start', '_end']
drop_cols = ufp.sort(X_df, [self.id_col, self.time_col])
X_df = ufp.drop_columns(X_df, drop_cols)
X_df if getattr(self, 'max_horizon', None) is None:
= self._predict_recursive(
preds =models,
models=horizon,
horizon=before_predict_callback,
before_predict_callback=after_predict_callback,
after_predict_callback=X_df,
X_df
)else:
= self._predict_multi(
preds =models,
models=horizon,
horizon=before_predict_callback,
before_predict_callback=X_df,
X_df
)if self.target_transforms is not None:
if self._has_ga_target_tfms():
= [c for c in preds.columns if c not in (self.id_col, self.time_col)]
model_cols = np.arange(0, horizon * (len(self.uids) + 1), horizon)
indptr for tfm in self.target_transforms[::-1]:
if isinstance(tfm, _BaseGroupedArrayTargetTransform):
for col in model_cols:
= GroupedArray(preds[col].to_numpy().astype(self.ga.data.dtype), indptr)
ga = tfm.inverse_transform(ga)
ga = ufp.assign_columns(preds, col, ga.data)
preds else:
= tfm.inverse_transform(preds)
preds return preds
def save(self, path: Union[str, Path]) -> None:
with fsspec.open(path, 'wb') as f:
self, f)
cloudpickle.dump(
@staticmethod
def load(
str, Path], protocol: Optional[str] = None
path: Union[-> 'TimeSeries':
) with fsspec.open(path, 'rb', protocol=protocol) as f:
= cloudpickle.load(f)
ts return ts
def update(self, df: DataFrame) -> None:
"""更新存储序列的值。"""
self.id_col, self.time_col, self.target_col)
validate_format(df, = self.uids
uids if isinstance(uids, pd.Index):
= pd.Series(uids)
uids = ufp.match_if_categorical(uids, df[self.id_col])
uids, new_ids = ufp.copy_if_pandas(df, deep=False)
df = ufp.assign_columns(df, self.id_col, new_ids)
df = ufp.sort(df, by=[self.id_col, self.time_col])
df = df[self.target_col].to_numpy()
values = values.astype(self.ga.data.dtype, copy=False)
values = ufp.counts_by_id(df, self.id_col)
id_counts try:
= ufp.join(uids, id_counts, on=self.id_col, how='outer_coalesce')
sizes except (KeyError, ValueError):
# pandas raises key error, polars before coalesce raises value error
= ufp.join(uids, id_counts, on=self.id_col, how='outer')
sizes = ufp.fill_null(sizes, {'counts': 0})
sizes = ufp.sort(sizes, by=self.id_col)
sizes = ~ufp.is_in(sizes[self.id_col], uids)
new_groups = ufp.group_by_agg(df, self.id_col, {self.time_col: 'max'})
last_dates = ufp.join(sizes, last_dates, on=self.id_col, how='left')
last_dates = type(df)({self.id_col: uids, '_curr': self.last_dates})
curr_last_dates = ufp.join(last_dates, curr_last_dates, on=self.id_col, how='left')
last_dates = ufp.fill_null(last_dates, {self.time_col: last_dates['_curr']})
last_dates = ufp.sort(last_dates, by=self.id_col)
last_dates self.last_dates = ufp.cast(last_dates[self.time_col], self.last_dates.dtype)
self.uids = ufp.sort(sizes[self.id_col])
if isinstance(df, pd.DataFrame):
self.uids = pd.Index(self.uids)
self.last_dates = pd.Index(self.last_dates)
if new_groups.any():
if self.target_transforms is not None:
raise ValueError('Can not update target_transforms with new series.')
= ufp.filter_with_mask(sizes[self.id_col], new_groups)
new_ids = ufp.filter_with_mask(df, ufp.is_in(df[self.id_col], new_ids))
new_ids_df = ufp.counts_by_id(new_ids_df, self.id_col)
new_ids_counts = ufp.take_rows(df, new_ids_counts["counts"].to_numpy().cumsum() - 1)
new_statics = new_statics[self.static_features_.columns]
new_statics self.static_features_ = ufp.vertical_concat([self.static_features_, new_statics])
self.static_features_ = ufp.sort(self.static_features_, self.id_col)
if self.target_transforms is not None:
if self._has_ga_target_tfms():
= np.append(0, id_counts['counts']).cumsum()
indptr for tfm in self.target_transforms:
if isinstance(tfm, _BaseGroupedArrayTargetTransform):
= GroupedArray(values, indptr)
ga = tfm.update(ga)
ga = ufp.assign_columns(df, self.target_col, ga.data)
df else:
= tfm.update(df)
df = df[self.target_col].to_numpy()
values self.ga = self.ga.append_several(
=sizes['counts'].to_numpy().astype(np.int32),
new_sizes=values,
new_values=new_groups.to_numpy(),
new_groups )
lambda: TimeSeries(freq='D', lags=list(range(2))), contains='lags must be positive integers')
test_fail(lambda: TimeSeries(freq='D', lag_transforms={0: 1}), contains='keys of lag_transforms must be positive integers') test_fail(
# 差异
= 7 * 14
n = pd.DataFrame(
x
{'id': np.repeat(0, n),
'ds': np.arange(n),
'y': np.arange(7)[[x % 7 for x in np.arange(n)]]
},
)'y'] = x['ds'] * 0.1 + x['y']
x[= TimeSeries(freq=1, target_transforms=[Differences([1, 7])])
ts -14], id_col='id', time_col='ds', target_col='y')
ts._fit(x.iloc[:= False
ts.as_numpy
np.testing.assert_allclose('y'].diff(1).diff(7).values[:-14],
x[
ts.ga.data,
)= np.zeros(14)
ts.y_pred class A:
def fit(self, X):
return self
def predict(self, X):
return np.zeros(X.shape[0])
= ts.predict({'A': A()}, 14)
xx 'A'], x['y'].tail(14).values) np.testing.assert_allclose(xx[
# 变换命名器
def namer(f, lag, *args):
return f'hello_from_{f.__name__}'
= TimeSeries(
ts =1,
freq={1: [(rolling_mean, 7), expanding_mean]},
lag_transforms=namer,
lag_transforms_namer
)= ts.fit_transform(x, id_col='id', time_col='ds', target_col='y')
transformed
test_eq(
transformed.columns.tolist(),'id', 'ds', 'y', 'hello_from_rolling_mean', 'hello_from_expanding_mean'],
[ )
lambda: TimeSeries(freq=1, date_features=[lambda: 1]), contains="Can't use a lambda") test_fail(
TimeSeries
类负责定义要执行的转换(lags
,lag_transforms
和 date_features
)。如果 num_threads > 1
,则可以使用多线程计算这些转换。
def month_start_or_end(dates):
return dates.is_month_start | dates.is_month_end
= dict(
flow_config ='W-THU',
freq=[7],
lags={
lag_transforms1: [expanding_mean, (rolling_mean, 7)]
},=['dayofweek', 'week', month_start_or_end]
date_features
)
= TimeSeries(**flow_config)
ts ts
TimeSeries(freq=W-THU, transforms=['lag7', 'expanding_mean_lag1', 'rolling_mean_lag1_window_size7'], date_features=['dayofweek', 'week', 'month_start_or_end'], num_threads=1)
test_eq(=ts.freq).freq,
TimeSeries(freq='W-THU').freq
TimeSeries(freq )
频率被转换为一个偏移量。
'freq'])) test_eq(ts.freq, pd.tseries.frequencies.to_offset(flow_config[
日期特征按照传递给构造函数的方式存储。
'date_features']) test_eq(ts.date_features, flow_config[
变换被存储为一个字典,其中键是变换的名称(计算特征的 DataFrame 中的列名),该名称是使用 build_transform_name
构建的,值是一个元组,元组的第一个元素是施加的滞后期,接下来是函数及函数参数。
test_eq(
ts.transforms,
{'lag7': Lag(7),
'expanding_mean_lag1': (1, expanding_mean),
'rolling_mean_lag1_window_size7': (1, rolling_mean, 7)
} )
注意,对于 lags
,我们将转换定义为应用于其相应滞后的恒等函数。这是因为 _transform_series
将滞后作为参数,并在计算转换之前移动数组。
# 整数y被转换为float32类型
= serie.copy()
serie2 'y'] = serie2['y'].astype(int)
serie2[= TimeSeries(num_threads=1, freq='D')
ts ='unique_id', time_col='ds', target_col='y')
ts._fit(serie2, id_col test_eq(ts.ga.data.dtype, np.float32)
# _计算变换
= serie.y.values
y = shift_array(y, 1)
lag_1
for num_threads in (1, 2):
= TimeSeries(**flow_config)
ts ='unique_id', time_col='ds', target_col='y')
ts._fit(serie, id_col= ts._compute_transforms(ts.transforms, updates_only=False)
transforms
'lag7'], shift_array(y, 7))
np.testing.assert_equal(transforms['expanding_mean_lag1'], expanding_mean(lag_1))
np.testing.assert_equal(transforms['rolling_mean_lag1_window_size7'], rolling_mean(lag_1, 7)) np.testing.assert_equal(transforms[
# 更新_y
= TimeSeries(freq='D', lags=[1])
ts ='unique_id', time_col='ds', target_col='y')
ts._fit(serie, id_col
= np.diff(ts.ga.indptr)
max_size 1])
ts._update_y([2])
ts._update_y([
+ 2)
test_eq(np.diff(ts.ga.indptr), max_size -2:], [1, 2]) test_eq(ts.ga.data[
# _更新功能
= TimeSeries(**flow_config)
ts ='unique_id', time_col='ds', target_col='y')
ts._fit(serie, id_col
ts._predict_setup()= ts._update_features()
updates
= serie['ds'].max()
last_date = last_date + pd.offsets.Day()
first_prediction_date
# these have an offset becase we can now "see" our last y value
= pd.DataFrame({
expected 'unique_id': ts.uids,
'lag7': shift_array(y, 6)[-1],
'expanding_mean_lag1': expanding_mean(y)[-1],
'rolling_mean_lag1_window_size7': rolling_mean(y, 7)[-1],
'dayofweek': np.uint8([getattr(first_prediction_date, 'dayofweek')]),
'week': np.uint8([first_prediction_date.isocalendar()[1]]),
'month_start_or_end': month_start_or_end(first_prediction_date)
})= serie.tail(1).drop(columns=['ds', 'y'])
statics
pd.testing.assert_frame_equal(updates, statics.merge(expected))
0], first_prediction_date) test_eq(ts.curr_dates[
# _获取预测结果
= TimeSeries(freq='D', lags=[1])
ts ='unique_id', time_col='ds', target_col='y')
ts._fit(serie, id_col
ts._predict_setup()
ts._update_features()1.])
ts._update_y([= ts._get_predictions()
preds
= serie['ds'].max()
last_ds = pd.DataFrame({'unique_id': serie['unique_id'][[0]], 'ds': [last_ds + pd.offsets.Day()], 'y_pred': [1.]})
expected pd.testing.assert_frame_equal(preds, expected)
=2) show_doc(TimeSeries.fit_transform, title_level
TimeSeries.fit_transform
TimeSeries.fit_transform (data:Union[pandas.core.frame.DataFrame,polars.d ataframe.frame.DataFrame], id_col:str, time_col:str, target_col:str, static_features:Optional[List[str]]=None, dropna:bool=True, keep_last_n:Optional[int]=None, max_horizon:Optional[int]=None, return_X_y:bool=False, as_numpy:bool=False)
*Add the features to data
and save the required information for the predictions step.
If not all features are static, specify which ones are in static_features
. If you don’t want to drop rows with null values after the transformations set dropna=False
If keep_last_n
is not None then that number of observations is kept across all series for updates.*
= dict(
flow_config ='D',
freq=[7, 14],
lags={
lag_transforms2: [
7),
(rolling_mean, 14),
(rolling_mean,
]
},=['dayofweek', 'month', 'year'],
date_features=2
num_threads
)
= TimeSeries(**flow_config)
ts = ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y') _
系列值存储为属性 ga
中的 GroupedArray。如果系列值的数据类型为整数,则转换为 np.float32
,这是因为滞后会生成 np.nan
,因此我们需要浮点数据类型来表示它们。
np.testing.assert_equal(ts.ga.data, series.y.values)
系列id存储在uids
属性中。
'unique_id'].unique()) test_eq(ts.uids, series[
对于每个时间序列,最后观察到的日期被存储,以便预测从最后日期 + 频率开始。
'unique_id', observed=True)['ds'].max().values) test_eq(ts.last_dates, series.groupby(
每个系列的最后一行(不包括 y
和 ds
列)被视为静态特征。
pd.testing.assert_frame_equal(
ts.static_features_,'unique_id', observed=True).tail(1).drop(columns=['ds', 'y']).reset_index(drop=True),
series.groupby( )
如果将static_features
传递给TimeSeries.fit_transform
,则仅保留这些特征。
='unique_id', time_col='ds', target_col='y', static_features=['static_0'])
ts.fit_transform(series, id_col
pd.testing.assert_frame_equal(
ts.static_features_,'unique_id', observed=True).tail(1)[['unique_id', 'static_0']].reset_index(drop=True),
series.groupby( )
您还可以在 TimeSeries.fit_transform 中指定 keep_last_n,这意味着在计算训练特征后,我们希望仅保留每个时间序列的最后 n 个样本以进行更新。这可以节省内存和时间,因为更新是通过再次对所有时间序列运行转换函数并仅保留最后一个值(即更新)来完成的。
如果您有非常长的时间序列,并且更新只需要少量样本,建议将 keep_last_n 设置为计算更新所需的最小样本量,在这种情况下是 15,因为我们在延迟 2 上有一个大小为 14 的滚动均值,在第一次更新中,延迟 2 变为延迟 1。这是因为在第一次更新中,延迟 1 是序列的最后一个值(或延迟 0),延迟 2 是延迟 1,依此类推。
= 15
keep_last_n
= TimeSeries(**flow_config)
ts = ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y', keep_last_n=keep_last_n)
df
ts._predict_setup()
= ['lag7', 'lag14']
expected_lags = ['rolling_mean_lag2_window_size7',
expected_transforms 'rolling_mean_lag2_window_size14']
= ['dayofweek', 'month', 'year']
expected_date_features
+ expected_transforms + expected_date_features)
test_eq(ts.features, expected_lags + ts.features, df.columns.drop(['ds', 'y']).tolist())
test_eq(ts.static_features_.columns.tolist() # 我们因滞后2而丢弃了2行,还需再丢弃13行以获得大小为14的窗口。
0], series.shape[0] - (2 + 13) * ts.ga.n_groups)
test_eq(df.shape[* keep_last_n) test_eq(ts.ga.data.size, ts.ga.n_groups
TimeSeries.fit_transform
要求 y 列不能有任何空值。这是因为转换可能会向前传播这些空值,因此如果 y 列中有空值,您将会得到错误。
= series.copy()
series_with_nulls 1, 'y'] = np.nan
series_with_nulls.loc[
test_fail(lambda: ts.fit_transform(series_with_nulls, id_col='unique_id', time_col='ds', target_col='y'),
='y column contains null values'
contains )
# 未排序的数据框
= TimeSeries(**flow_config)
ts = ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y')
df = series.sample(frac=1.0)
unordered_series assert not unordered_series.set_index('ds', append=True).index.is_monotonic_increasing
= ts.fit_transform(unordered_series, id_col='unique_id', time_col='ds', target_col='y')
df2
pd.testing.assert_frame_equal(=True),
df.reset_index(drop'unique_id', 'ds']).reset_index(drop=True)
df2.sort_values([ )
# 现有特征不会重新计算
= pd.DataFrame({
df_with_features 'unique_id': [1, 1, 1],
'ds': pd.date_range('2000-01-01', freq='D', periods=3),
'y': [10., 11., 12.],
'lag1': [1, 1, 1],
'month': [12, 12, 12],
})= TimeSeries(freq='D', lags=[1, 2], date_features=['year', 'month'])
ts = ts.fit_transform(df_with_features, id_col='unique_id', time_col='ds', target_col='y', dropna=False)
transformed 'lag1'], df_with_features['lag1'])
pd.testing.assert_series_equal(transformed['month'], df_with_features['month'])
pd.testing.assert_series_equal(transformed['year'], 3 * [2000])
np.testing.assert_array_equal(transformed['lag2'].values, [np.nan, np.nan, 10.]) np.testing.assert_array_equal(transformed[
# 非标准自由度
= TimeSeries(**flow_config)
ts = ts.fit_transform(series, id_col='unique_id', time_col='ds', target_col='y')
df = series.reset_index().rename(columns={'unique_id': 'some_id', 'ds': 'timestamp', 'y': 'value'})
non_std_series = ts.fit_transform(
non_std_res ='some_id', time_col='timestamp', target_col='value', static_features=[]
non_std_series, id_col
)= non_std_res.reset_index(drop=True)
non_std_res
pd.testing.assert_frame_equal(
df.reset_index(),={'timestamp': 'ds', 'value': 'y', 'some_id': 'unique_id'})
non_std_res.rename(columns )
# 整数时间戳
def identity(x):
return x
= copy.deepcopy(flow_config)
flow_config_int_ds 'date_features'] = [identity]
flow_config_int_ds['freq'] = 1
flow_config_int_ds[= TimeSeries(**flow_config_int_ds)
ts = series.copy()
int_ds_series 'ds'] = int_ds_series['ds'].astype('int64')
int_ds_series[= ts.fit_transform(int_ds_series, id_col='unique_id', time_col='ds', target_col='y')
int_ds_res 'ds'] = pd.to_datetime(int_ds_res['ds'])
int_ds_res['identity'] = pd.to_datetime(int_ds_res['ds'])
int_ds_res[= df.drop(columns=flow_config['date_features'])
df2 'identity'] = df2['ds']
df2[ pd.testing.assert_frame_equal(df2, int_ds_res)
=2) show_doc(TimeSeries.predict, title_level
TimeSeries.predict
TimeSeries.predict (models:Dict[str,Union[sklearn.base.BaseEstimator,List [sklearn.base.BaseEstimator]]], horizon:int, before_predict_callback:Optional[Callable]=None, after_predict_callback:Optional[Callable]=None, X_df: Union[pandas.core.frame.DataFrame,polars.dataframe.fr ame.DataFrame,NoneType]=None, ids:Optional[List[str]]=None)
一旦我们有了训练好的模型,我们可以使用 TimeSeries.predict
方法,传入模型和预测范围,以获取预测结果。
class DummyModel:
def predict(self, X: pd.DataFrame) -> np.ndarray:
return X['lag7'].values
= 7
horizon = DummyModel()
model = TimeSeries(**flow_config)
ts ='unique_id', time_col='ds', target_col='y')
ts.fit_transform(series, id_col= ts.predict({'DummyModel': model}, horizon)
predictions
= series.groupby('unique_id', observed=True)
grouped_series = grouped_series['y'].tail(7) # 该模型预测了滞后7期的结果。
expected_preds = grouped_series['ds'].max()
last_dates = last_dates + pd.offsets.Day()
expected_dsmin = last_dates + horizon * pd.offsets.Day()
expected_dsmax = predictions.groupby('unique_id', observed=True)
grouped_preds
'DummyModel'], expected_preds)
np.testing.assert_allclose(predictions['ds'].min(), expected_dsmin)
pd.testing.assert_series_equal(grouped_preds['ds'].max(), expected_dsmax) pd.testing.assert_series_equal(grouped_preds[
= DummyModel()
model = TimeSeries(**flow_config)
ts ='unique_id', time_col='ds', target_col='y')
ts.fit_transform(series, id_col= ts.predict({'DummyModel': model}, horizon=horizon)
predictions = TimeSeries(**flow_config_int_ds)
ts ='unique_id', time_col='ds', target_col='y')
ts.fit_transform(int_ds_series, id_col= ts.predict({'DummyModel': model}, horizon=horizon)
int_ds_predictions ='ds'), int_ds_predictions.drop(columns='ds')) pd.testing.assert_frame_equal(predictions.drop(columns
如果我们有动态特征,可以将它们传递给X_df
。
class PredictPrice:
def predict(self, X):
return X['price']
= generate_daily_series(20, n_static_features=2, equal_ends=True)
series = series.rename(columns={'static_1': 'product_id'})
dynamic_series = generate_prices_for_series(dynamic_series)
prices_catalog = dynamic_series.merge(prices_catalog, how='left')
series_with_prices
= PredictPrice()
model = TimeSeries(**flow_config)
ts
ts.fit_transform(
series_with_prices,='unique_id',
id_col='ds',
time_col='y',
target_col=['static_0', 'product_id'],
static_features
)= ts.predict({'PredictPrice': model}, horizon=1, X_df=prices_catalog)
predictions
pd.testing.assert_frame_equal(={'PredictPrice': 'price'}),
predictions.rename(columns'unique_id', 'ds']])[['unique_id', 'ds', 'price']]
prices_catalog.merge(predictions[[ )
# 预测子集
= ['id_00', 'id_16']
sample_ids = ts.predict({'price': model}, 1, X_df=prices_catalog, ids=sample_ids)
sample_preds
pd.testing.assert_frame_equal(
sample_preds,'unique_id'].isin(sample_ids)][['unique_id', 'ds']])[['unique_id', 'ds', 'price']]
prices_catalog.merge(predictions[predictions[
)lambda: ts.predict({'y': model}, 1, ids=['bonjour']), contains="{'bonjour'}") test_fail(
=2) show_doc(TimeSeries.update, title_level
TimeSeries.update
TimeSeries.update (df:Union[pandas.core.frame.DataFrame,polars.dataframe .frame.DataFrame])
Update the values of the stored series.
class SeasonalNaiveModel:
def predict(self, X):
return X['lag7']
class NaiveModel:
def predict(self, X: pd.DataFrame):
return X['lag1']
= series[series['unique_id'].isin(['id_00', 'id_19'])].copy()
two_series 'unique_id'] = pd.Categorical(two_series['unique_id'], ['id_00', 'id_19'])
two_series[= TimeSeries(freq='D', lags=[1], date_features=['dayofweek'])
ts
ts.fit_transform(
two_series,='unique_id',
id_col='ds',
time_col='y',
target_col
)= two_series.groupby('unique_id', observed=True).tail(1)
last_vals_two_series = last_vals_two_series[lambda x: x['unique_id'].eq('id_00')].copy()
last_val_id0 = last_val_id0.copy()
new_values 'ds'] += pd.offsets.Day()
new_values[= pd.DataFrame({
new_serie 'unique_id': ['new_idx', 'new_idx'],
'ds': pd.to_datetime(['2020-01-01', '2020-01-02']),
'y': [5.0, 6.0],
'static_0': [0, 0],
'static_1': [1, 1],
})= pd.concat([new_values, new_serie])
new_values
ts.update(new_values)= ts.predict({'Naive': NaiveModel()}, 1)
preds = last_val_id0.copy()
expected_id0 'ds'] += pd.offsets.Day()
expected_id0[= last_vals_two_series[lambda x: x['unique_id'].eq('id_19')].copy()
expected_id1 = new_serie.tail(1)[['unique_id', 'ds', 'y']]
last_val_new_serie = pd.concat([expected_id0, expected_id1, last_val_new_serie])
expected = expected[['unique_id', 'ds', 'y']]
expected = expected.rename(columns={'y': 'Naive'}).reset_index(drop=True)
expected 'unique_id'] = pd.Categorical(expected['unique_id'], categories=['id_00', 'id_19', 'new_idx'])
expected['ds'] += pd.offsets.Day()
expected[
pd.testing.assert_frame_equal(preds, expected)
pd.testing.assert_frame_equal(
ts.static_features_,
(1)])
pd.concat([last_vals_two_series, new_serie.tail('unique_id', 'static_0', 'static_1']]
[[
.astype(ts.static_features_.dtypes)=True)
.reset_index(drop
)
)# 使用目标变换
= TimeSeries(
ts ='D',
freq=[7],
lags=[Differences([1, 2]), LocalStandardScaler()],
target_transforms
)='unique_id', time_col='ds', target_col='y')
ts.fit_transform(two_series, id_col= two_series.groupby('unique_id', observed=True).tail(7).copy()
new_values 'ds'] += 7 * pd.offsets.Day()
new_values[= ts.ga.take_from_groups(slice(-7, None)).data
orig_last7
ts.update(new_values)= ts.predict({'SeasonalNaive': SeasonalNaiveModel()}, 7)
preds
np.testing.assert_allclose('y'].values,
new_values['SeasonalNaive'].values,
preds[
)= ts.ga.take_from_groups(slice(-7, None)).data
last7 assert 0 < np.abs(last7 / orig_last7 - 1).mean() < 0.5
#| 极地
= generate_daily_series(2, n_static_features=2, engine='polars')
two_series = TimeSeries(freq='1d', lags=[1], date_features=['weekday'])
ts
ts.fit_transform(
two_series,='unique_id',
id_col='ds',
time_col='y',
target_col
)= two_series.join(
last_vals_two_series 'unique_id').agg(pl.col('ds').max()), on=['unique_id', 'ds']
two_series.group_by(
)= last_vals_two_series.filter(pl.col('unique_id') == 'id_0')
last_val_id0 = last_val_id0.with_columns(
new_values 'unique_id').cast(pl.Categorical),
pl.col('ds').dt.offset_by('1d'),
pl.col('static_0').cast(pl.Int64),
pl.col('static_1').cast(pl.Int64),
pl.col(
)= pl.DataFrame({
new_serie 'unique_id': ['new_idx', 'new_idx'],
'ds': [datetime.datetime(2020, 1, 1), datetime.datetime(2020, 1, 2)],
'y': [5.0, 6.0],
'static_0': [0, 0],
'static_1': [1, 1],
}).with_columns('ds').dt.cast_time_unit('ns'),
pl.col('unique_id').cast(pl.Categorical),
pl.col(
)= pl.concat([new_values, new_serie])
new_values
ts.update(new_values)= ts.predict({'Naive': NaiveModel()}, 1)
preds = last_val_id0.with_columns(pl.col('ds').dt.offset_by('1d'))
expected_id0 = last_vals_two_series.filter(pl.col('unique_id') == 'id_1')
expected_id1 = new_serie.tail(1)
last_val_new_serie = pl.concat([expected_id0, expected_id1])
expected = ufp.vertical_concat([expected, last_val_new_serie])
expected
pd.testing.assert_series_equal('unique_id'].cat.get_categories().to_pandas(),
expected['id_0', 'id_1', 'new_idx'], name='unique_id')
pd.Series([
)= expected[['unique_id', 'ds', 'y']]
expected = ufp.rename(expected, {'y': 'Naive'})
expected = expected.with_columns(pl.col('ds').dt.offset_by('1d'))
expected
pd.testing.assert_frame_equal(preds.to_pandas(), expected.to_pandas())
pd.testing.assert_frame_equal(
ts.static_features_.to_pandas(),
(1)])
ufp.vertical_concat([last_vals_two_series, new_serie.tail('unique_id', 'static_0', 'static_1']]
[[
.to_pandas()
.astype(ts.static_features_.to_pandas().dtypes)=True)
.reset_index(drop
)
)# 使用目标变换
= TimeSeries(
ts ='1d',
freq=[7],
lags=[Differences([1, 2]), LocalStandardScaler()],
target_transforms
)='unique_id', time_col='ds', target_col='y')
ts.fit_transform(two_series, id_col= two_series.group_by('unique_id').tail(7)
new_values = new_values.with_columns(pl.col('ds').dt.offset_by('7d'))
new_values = ts.ga.take_from_groups(slice(-7, None)).data
orig_last7
ts.update(new_values)= ts.predict({'SeasonalNaive': SeasonalNaiveModel()}, 7)
preds
np.testing.assert_allclose('y'].to_numpy(),
new_values['SeasonalNaive'].to_numpy(),
preds[
)= ts.ga.take_from_groups(slice(-7, None)).data
last7 assert 0 < np.abs(last7 / orig_last7 - 1).mean() < 0.5
sys:1: CategoricalRemappingWarning: Local categoricals have different encodings, expensive re-encoding is done to perform this merge operation. Consider using a StringCache or an Enum type if the categories are known in advance
# target_transform 保留最后 n 个
= TimeSeries(freq='D', lags=[1], target_transforms=[LocalStandardScaler()])
ts ='unique_id', time_col='ds', target_col='y', keep_last_n=10)
ts.fit_transform(series, id_col= ts.predict({'y': NaiveModel()}, 1)
preds = series.groupby('unique_id', observed=True).tail(1)[['unique_id', 'ds', 'y']].reset_index(drop=True)
expected 'ds'] += pd.offsets.Day()
expected[ pd.testing.assert_frame_equal(preds, expected)
# 在预测时,如果省略了`static_features`参数,而将其作为动态特征传递,则引发错误。
= series.groupby('unique_id', observed=True).tail(10)
valid = series.drop(valid.index)
train = TimeSeries(freq='D', lags=[1], target_transforms=[LocalStandardScaler()])
ts ='unique_id', time_col='ds', target_col='y', keep_last_n=10)
ts.fit_transform(train, id_collambda: ts.predict({'y': NaiveModel()}, 1, X_df=valid.drop(columns=['y'])), contains="['static_0', 'static_1']") test_fail(
#| 极地
= generate_daily_series(5, static_as_categorical=False, n_static_features=5, engine='polars')
series_pl = generate_daily_series(5, static_as_categorical=False, n_static_features=5, engine='pandas')
series_pd = series_pl.with_columns(pl.col('unique_id').cast(str))
series_pl 'unique_id'] = series_pd['unique_id'].astype(str)
series_pd[
= dict(
cfg =[1, 2, 3, 4],
lags={
lag_transforms1: [expanding_mean, (rolling_mean, 7), (rolling_mean, 14)],
2: [expanding_mean, (rolling_mean, 7), (rolling_mean, 14)],
3: [expanding_mean, (rolling_mean, 7), (rolling_mean, 14)],
4: [expanding_mean, (rolling_mean, 7), (rolling_mean, 14)],
},=['day', 'month', 'quarter', 'year'],
date_features=[Differences([1])],
target_transforms
)= SaveFeatures()
feats_pl = TimeSeries(freq='1d', **cfg)
ts_pl = ts_pl.fit_transform(series_pl, 'unique_id', 'ds', 'y')
prep_pl = ts_pl.predict({'y': NaiveModel()}, 2, before_predict_callback=feats_pl)
fcst_pl
= SaveFeatures()
feats_pd = TimeSeries(freq='1D', **cfg)
ts_pd = ts_pd.fit_transform(series_pd, 'unique_id', 'ds', 'y')
prep_pd = ts_pd.predict({'y': NaiveModel()}, 2, before_predict_callback=feats_pd)
fcst_pd
= prep_pd.reset_index(drop=True)
prep_pd = prep_pl.to_pandas()
prep_pl = fcst_pl.to_pandas()
fcst_pl # 日期特征具有不同的数据类型
=False)
pd.testing.assert_frame_equal(prep_pl, prep_pd, check_dtype
pd.testing.assert_frame_equal(=True).to_pandas(),
feats_pl.get_features(with_step=True),
feats_pd.get_features(with_step=False,
check_dtype
) pd.testing.assert_frame_equal(fcst_pl, fcst_pd)
# 被砍掉的剧集
for ordered in [True, False]:
= generate_daily_series(10, min_length=5, max_length=20)
series if not ordered:
= series.sample(frac=1.0, random_state=40)
series = TimeSeries(freq='D', lags=[10])
ts with warnings.catch_warnings(record=True):
= ts.fit_transform(series, 'unique_id', 'ds', 'y')
prep = ts.uids[ts._dropped_series].tolist()
dropped assert not prep['unique_id'].isin(dropped).any()
assert set(prep['unique_id'].unique().tolist() + dropped) == set(series['unique_id'].unique())
# 短系列例外
= generate_daily_series(2, min_length=5, max_length=15)
series = TimeSeries(freq='D', lags=[1], target_transforms=[Differences([20])])
ts
test_fail(lambda: ts.fit_transform(series, 'unique_id', 'ds', 'y'),
="are too short for the 'Differences' transformation"
contains )
# 测试预测
class Lag1PlusOneModel:
def predict(self, X):
return X['lag1'] + 1
= TimeSeries(freq='D', lags=[1])
ts for max_horizon in [None, 2]:
if max_horizon is None:
= Lag1PlusOneModel()
mod1 = Lag1PlusOneModel()
mod2 else:
= [Lag1PlusOneModel() for _ in range(max_horizon)]
mod1 = [Lag1PlusOneModel() for _ in range(max_horizon)]
mod2 'unique_id', 'ds', 'y', max_horizon=max_horizon)
ts.fit_transform(train, # 每个模型都获得了正确的历史值
= ts.predict(models={'mod1': mod1, 'mod2': mod2}, horizon=2)
preds 'mod1'], preds['mod2'])
np.testing.assert_allclose(preds[# 幂等性
= ts.predict(models={'mod1': mod1, 'mod2': mod2}, horizon=2)
preds2 'mod1'], preds2['mod2'])
np.testing.assert_allclose(preds2[ pd.testing.assert_frame_equal(preds, preds2)
# 保存与加载
= generate_daily_series(2, n_static_features=2)
series = TimeSeries(
ts ='D',
freq=[1, 2],
lags=['dayofweek'],
date_features={
lag_transforms1: [RollingMean(1)]
},=[Differences([20])],
target_transforms
)'unique_id', 'ds', 'y')
ts.fit_transform(series, with tempfile.TemporaryDirectory() as tmpdir:
= Path(tmpdir) / 'hi'
fname
ts.save(fname)= TimeSeries.load(fname)
ts2 = ts.predict({'model': NaiveModel()}, 10)
preds = ts2.predict({'model': NaiveModel()}, 10)
preds2 pd.testing.assert_frame_equal(preds, preds2)
Give us a ⭐ on Github