%load_ext autoreload
%autoreload 2
分布式预测
import warnings
from fastcore.test import test_warns, test_eq, test_ne
from nbdev import show_doc
from sklearn import set_config
'ignore', FutureWarning)
warnings.simplefilter(='text') set_config(display
分布式管道封装
此接口仅在Linux上测试
import copy
from collections import namedtuple
from typing import Any, Callable, Iterable, List, Optional
import cloudpickle
import fsspec
try:
import dask.dataframe as dd
= True
DASK_INSTALLED except ModuleNotFoundError:
= False
DASK_INSTALLED import fugue
import fugue.api as fa
import numpy as np
import pandas as pd
import utilsforecast.processing as ufp
try:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql import DataFrame as SparkDataFrame
= True
SPARK_INSTALLED except ModuleNotFoundError:
= False
SPARK_INSTALLED try:
from lightgbm_ray import RayDMatrix
from ray.data import Dataset as RayDataset
= True
RAY_INSTALLED except ModuleNotFoundError:
= False
RAY_INSTALLED from sklearn.base import clone
from mlforecast.core import (
DateFeature,
Freq,
LagTransforms,
Lags,
TargetTransform,
TimeSeries,
_build_transform_name,
_name_models,
)from mlforecast.forecast import MLForecast
from mlforecast.grouped_array import GroupedArray
= namedtuple('WindowInfo', ['n_windows', 'window_size', 'step_size', 'i_window', 'input_size']) WindowInfo
class DistributedMLForecast:
"""多后端分布式流水线"""
def __init__(
self,
models,
freq: Freq,= None,
lags: Optional[Lags] = None,
lag_transforms: Optional[LagTransforms] = None,
date_features: Optional[Iterable[DateFeature]] int = 1,
num_threads: = None,
target_transforms: Optional[List[TargetTransform]] = None,
engine int] = None,
num_partitions: Optional[= None,
lag_transforms_namer: Optional[Callable]
):"""Create distributed forecast object
Parameters
----------
models : regressor or list of regressors
Models that will be trained and used to compute the forecasts.
freq : str or int, optional (default=None)
Pandas offset alias, e.g. 'D', 'W-THU' or integer denoting the frequency of the series.
lags : list of int, optional (default=None)
Lags of the target to use as features.
lag_transforms : dict of int to list of functions, optional (default=None)
Mapping of target lags to their transformations.
date_features : list of str or callable, optional (default=None)
Features computed from the dates. Can be pandas date attributes or functions that will take the dates as input.
num_threads : int (default=1)
Number of threads to use when computing the features.
target_transforms : list of transformers, optional(default=None)
Transformations that will be applied to the target before computing the features and restored after the forecasting step.
engine : fugue execution engine, optional (default=None)
Dask Client, Spark Session, etc to use for the distributed computation.
If None will infer depending on the input type.
num_partitions: number of data partitions to use, optional (default=None)
If None, the default partitions provided by the AnyDataFrame used
by the `fit` and `cross_validation` methods will be used. If a Ray
Dataset is provided and `num_partitions` is None, the partitioning
will be done by the `id_col`.
lag_transforms_namer : callable, optional(default=None)
Function that takes a transformation (either function or class), a lag and extra arguments and produces a name
"""
if not isinstance(models, dict) and not isinstance(models, list):
= [models]
models if isinstance(models, list):
= _name_models([m.__class__.__name__ for m in models])
model_names = dict(zip(model_names, models))
models_with_names else:
= models
models_with_names self.models = models_with_names
if lag_transforms_namer is None:
def name_without_dots(tfm, lag, *args):
= _build_transform_name(tfm, lag, args)
name return name.replace('.', '_')
= name_without_dots
lag_transforms_namer self._base_ts = TimeSeries(
=freq,
freq=lags,
lags=lag_transforms,
lag_transforms=date_features,
date_features=num_threads,
num_threads=target_transforms,
target_transforms=lag_transforms_namer,
lag_transforms_namer
)self.engine = engine
self.num_partitions = num_partitions
def __repr__(self) -> str:
return (
f'{self.__class__.__name__}(models=[{", ".join(self.models.keys())}], '
f"freq={self._base_ts.freq}, "
f"lag_features={list(self._base_ts.transforms.keys())}, "
f"date_features={self._base_ts.date_features}, "
f"num_threads={self._base_ts.num_threads}, "
f"engine={self.engine})"
)
@staticmethod
def _preprocess_partition(
part: pd.DataFrame,
base_ts: TimeSeries, str,
id_col: str,
time_col: str,
target_col: str]] = None,
static_features: Optional[List[bool = True,
dropna: int] = None,
keep_last_n: Optional[= None,
window_info: Optional[WindowInfo] bool = False,
fit_ts_only: -> List[List[Any]]:
) = copy.deepcopy(base_ts)
ts if fit_ts_only:
ts._fit(
part,=id_col,
id_col=time_col,
time_col=target_col,
target_col=static_features,
static_features=keep_last_n,
keep_last_n
)= ts._get_core_lag_tfms()
core_tfms if core_tfms:
# 填充更新所需的统计数据
=False)
ts._compute_transforms(core_tfms, updates_only= False
ts.as_numpy return [[cloudpickle.dumps(ts), cloudpickle.dumps(None), cloudpickle.dumps(None)]]
if window_info is None:
= part
train = None
valid else:
= part.groupby(id_col, observed=True)[time_col].transform('max')
max_dates = ufp._single_split(
cutoffs, train_mask, valid_mask
part,=window_info.i_window,
i_window=window_info.n_windows,
n_windows=window_info.window_size,
h=id_col,
id_col=time_col,
time_col=ts.freq,
freq=max_dates,
max_dates=window_info.step_size,
step_size=window_info.input_size,
input_size
)= part[train_mask]
train = part.columns
valid_keep_cols if static_features is not None:
valid_keep_cols.drop(static_features)= part.loc[valid_mask, valid_keep_cols].merge(cutoffs, on=id_col)
valid = ts.fit_transform(
transformed
train,=id_col,
id_col=time_col,
time_col=target_col,
target_col=static_features,
static_features=dropna,
dropna=keep_last_n,
keep_last_n
)return [[cloudpickle.dumps(ts), cloudpickle.dumps(transformed), cloudpickle.dumps(valid)]]
@staticmethod
def _retrieve_df(items: List[List[Any]]) -> Iterable[pd.DataFrame]:
for _, serialized_train, _ in items:
yield cloudpickle.loads(serialized_train)
def _preprocess_partitions(
self,
data: fugue.AnyDataFrame,str,
id_col: str,
time_col: str,
target_col: str]] = None,
static_features: Optional[List[bool = True,
dropna: int] = None,
keep_last_n: Optional[= None,
window_info: Optional[WindowInfo] bool = False,
fit_ts_only: -> List[Any]:
) if self.num_partitions:
= dict(by=id_col, num=self.num_partitions, algo='coarse')
partition elif RAY_INSTALLED and isinstance(data, RayDataset): # num partitions 为 None,但 data 是一个 RayDataset
# 我们需要添加这一点,因为
# 目前,Ray 不支持对数据集进行分区。
# 基于一列。
# 如果使用 `.repartition(num_partitions)` 对数据集进行分区
# 我们会得到尴尬的结果。
= dict(by=id_col)
partition else:
= None
partition = fa.transform(
res
data,
DistributedMLForecast._preprocess_partition,={
params'base_ts': self._base_ts,
'id_col': id_col,
'time_col': time_col,
'target_col': target_col,
'static_features': static_features,
'dropna': dropna,
'keep_last_n': keep_last_n,
'window_info': window_info,
'fit_ts_only': fit_ts_only,
},='ts:binary,train:binary,valid:binary',
schema=self.engine,
engine=True,
as_fugue=partition,
partition
)# so that we don't need to recompute this on predict
return fa.persist(res, lazy=False, engine=self.engine, as_fugue=True)
def _preprocess(
self,
data: fugue.AnyDataFrame,str,
id_col: str,
time_col: str,
target_col: str]] = None,
static_features: Optional[List[bool = True,
dropna: int] = None,
keep_last_n: Optional[= None,
window_info: Optional[WindowInfo] -> fugue.AnyDataFrame:
) self._base_ts.id_col = id_col
self._base_ts.time_col = time_col
self._base_ts.target_col = target_col
self._base_ts.static_features = static_features
self._base_ts.dropna = dropna
self._base_ts.keep_last_n = keep_last_n
self._partition_results = self._preprocess_partitions(
=data,
data=id_col,
id_col=time_col,
time_col=target_col,
target_col=static_features,
static_features=dropna,
dropna=keep_last_n,
keep_last_n=window_info,
window_info
)= str(fa.get_schema(data))
base_schema = ','.join(f'{feat}:double' for feat in self._base_ts.features)
features_schema = fa.transform(
res self._partition_results,
DistributedMLForecast._retrieve_df,=f'{base_schema},{features_schema}',
schema=self.engine,
engine
)return fa.get_native_as_df(res)
def preprocess(
self,
df: fugue.AnyDataFrame,str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col: str]] = None,
static_features: Optional[List[bool = True,
dropna: int] = None,
keep_last_n: Optional[-> fugue.AnyDataFrame:
) """Add the features to `data`.
Parameters
----------
df : dask, spark or ray DataFrame.
Series data in long format.
id_col : str (default='unique_id')
Column that identifies each serie.
time_col : str (default='ds')
Column that identifies each timestep, its values can be timestamps or integers.
target_col : str (default='y')
Column that contains the target.
static_features : list of str, optional (default=None)
Names of the features that are static and will be repeated when forecasting.
dropna : bool (default=True)
Drop rows with missing values produced by the transformations.
keep_last_n : int, optional (default=None)
Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it.
Returns
-------
result : same type as df
`df` with added features.
"""
return self._preprocess(
df,=id_col,
id_col=time_col,
time_col=target_col,
target_col=static_features,
static_features=dropna,
dropna=keep_last_n,
keep_last_n
)
def _fit(
self,
data: fugue.AnyDataFrame,str,
id_col: str,
time_col: str,
target_col: str]] = None,
static_features: Optional[List[bool = True,
dropna: int] = None,
keep_last_n: Optional[= None,
window_info: Optional[WindowInfo] -> 'DistributedMLForecast':
) = self._preprocess(
prep
data,=id_col,
id_col=time_col,
time_col=target_col,
target_col=static_features,
static_features=dropna,
dropna=keep_last_n,
keep_last_n=window_info,
window_info
)= [x for x in fa.get_column_names(prep) if x not in {id_col, time_col, target_col}]
features self.models_ = {}
if SPARK_INSTALLED and isinstance(data, SparkDataFrame):
= VectorAssembler(inputCols=features, outputCol="features")
featurizer = featurizer.transform(prep)[target_col, "features"]
train_data for name, model in self.models.items():
= model._pre_fit(target_col).fit(train_data)
trained_model self.models_[name] = model.extract_local_model(trained_model)
elif DASK_INSTALLED and isinstance(data, dd.DataFrame):
= prep[features], prep[target_col]
X, y for name, model in self.models.items():
= clone(model).fit(X, y)
trained_model self.models_[name] = trained_model.model_
elif RAY_INSTALLED and isinstance(data, RayDataset):
= RayDMatrix(
X =features + [target_col]),
prep.select_columns(cols=target_col,
label
)for name, model in self.models.items():
= clone(model).fit(X, y=None)
trained_model self.models_[name] = trained_model.model_
else:
raise NotImplementedError('Only spark, dask, and ray engines are supported.')
return self
def fit(
self,
df: fugue.AnyDataFrame,str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col: str]] = None,
static_features: Optional[List[bool = True,
dropna: int] = None,
keep_last_n: Optional[-> 'DistributedMLForecast':
) """Apply the feature engineering and train the models.
Parameters
----------
df : dask, spark or ray DataFrame
Series data in long format.
id_col : str (default='unique_id')
Column that identifies each serie.
time_col : str (default='ds')
Column that identifies each timestep, its values can be timestamps or integers.
target_col : str (default='y')
Column that contains the target.
static_features : list of str, optional (default=None)
Names of the features that are static and will be repeated when forecasting.
dropna : bool (default=True)
Drop rows with missing values produced by the transformations.
keep_last_n : int, optional (default=None)
Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it.
Returns
-------
self : DistributedMLForecast
Forecast object with series values and trained models.
"""
return self._fit(
df,=id_col,
id_col=time_col,
time_col=target_col,
target_col=static_features,
static_features=dropna,
dropna=keep_last_n,
keep_last_n
)
@staticmethod
def _predict(
items: List[List[Any]],
models,
horizon,=None,
before_predict_callback=None,
after_predict_callback=None,
X_df-> Iterable[pd.DataFrame]:
) for serialized_ts, _, serialized_valid in items:
= cloudpickle.loads(serialized_valid)
valid = cloudpickle.loads(serialized_ts)
ts = ts.predict(
res =models,
models=horizon,
horizon=before_predict_callback,
before_predict_callback=after_predict_callback,
after_predict_callback=X_df,
X_df
)if valid is not None:
= res.merge(valid, how='left')
res yield res
def _get_predict_schema(self) -> str:
= self.models.keys()
model_names = ','.join(f'{model_name}:double' for model_name in model_names)
models_schema = f'{self._base_ts.id_col}:string,{self._base_ts.time_col}:datetime,' + models_schema
schema return schema
def predict(
self,
int,
h: = None,
before_predict_callback: Optional[Callable] = None,
after_predict_callback: Optional[Callable] = None,
X_df: Optional[pd.DataFrame] = None,
new_df: Optional[fugue.AnyDataFrame] -> fugue.AnyDataFrame:
) """Compute the predictions for the next `horizon` steps.
Parameters
----------
h : int
Forecast horizon.
before_predict_callback : callable, optional (default=None)
Function to call on the features before computing the predictions.
This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure.
The series identifier is on the index.
after_predict_callback : callable, optional (default=None)
Function to call on the predictions before updating the targets.
This function will take a pandas Series with the predictions and should return another one with the same structure.
The series identifier is on the index.
X_df : pandas DataFrame, optional (default=None)
Dataframe with the future exogenous features. Should have the id column and the time column.
new_df : dask or spark DataFrame, optional (default=None)
Series data of new observations for which forecasts are to be generated.
This dataframe should have the same structure as the one used to fit the model, including any features and time series data.
If `new_df` is not None, the method will generate forecasts for the new observations.
Returns
-------
result : dask, spark or ray DataFrame
Predictions for each serie and timestep, with one column per model.
"""
if new_df is not None:
= self._preprocess_partitions(
partition_results
new_df,=self._base_ts.id_col,
id_col=self._base_ts.time_col,
time_col=self._base_ts.target_col,
target_col=self._base_ts.static_features,
static_features=self._base_ts.dropna,
dropna=self._base_ts.keep_last_n,
keep_last_n=True,
fit_ts_only
)else:
= self._partition_results
partition_results = self._get_predict_schema()
schema if X_df is not None and not isinstance(X_df, pd.DataFrame):
raise ValueError('`X_df` should be a pandas DataFrame')
= fa.transform(
res
partition_results,
DistributedMLForecast._predict,={
params'models': self.models_,
'horizon': h,
'before_predict_callback': before_predict_callback,
'after_predict_callback': after_predict_callback,
'X_df': X_df,
},=schema,
schema=self.engine,
engine
)return fa.get_native_as_df(res)
def cross_validation(
self,
df: fugue.AnyDataFrame,int,
n_windows: int,
h: str = 'unique_id',
id_col: str = 'ds',
time_col: str = 'y',
target_col: int] = None,
step_size: Optional[str]] = None,
static_features: Optional[List[bool = True,
dropna: int] = None,
keep_last_n: Optional[bool = True,
refit: = None,
before_predict_callback: Optional[Callable] = None,
after_predict_callback: Optional[Callable] int] = None,
input_size: Optional[-> fugue.AnyDataFrame:
) """Perform time series cross validation.
Creates `n_windows` splits where each window has `h` test periods,
trains the models, computes the predictions and merges the actuals.
Parameters
----------
df : dask, spark or ray DataFrame
Series data in long format.
n_windows : int
Number of windows to evaluate.
h : int
Number of test periods in each window.
id_col : str (default='unique_id')
Column that identifies each serie.
time_col : str (default='ds')
Column that identifies each timestep, its values can be timestamps or integers.
target_col : str (default='y')
Column that contains the target.
step_size : int, optional (default=None)
Step size between each cross validation window. If None it will be equal to `h`.
static_features : list of str, optional (default=None)
Names of the features that are static and will be repeated when forecasting.
dropna : bool (default=True)
Drop rows with missing values produced by the transformations.
keep_last_n : int, optional (default=None)
Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it.
refit : bool (default=True)
Retrain model for each cross validation window.
If False, the models are trained at the beginning and then used to predict each window.
before_predict_callback : callable, optional (default=None)
Function to call on the features before computing the predictions.
This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure.
The series identifier is on the index.
after_predict_callback : callable, optional (default=None)
Function to call on the predictions before updating the targets.
This function will take a pandas Series with the predictions and should return another one with the same structure.
The series identifier is on the index.
input_size : int, optional (default=None)
Maximum training samples per serie in each window. If None, will use an expanding window.
Returns
-------
result : dask, spark or ray DataFrame
Predictions for each window with the series id, timestamp, target value and predictions from each model.
"""
self.cv_models_ = []
= []
results for i in range(n_windows):
= WindowInfo(n_windows, h, step_size, i, input_size)
window_info if refit or i == 0:
self._fit(
df,=id_col,
id_col=time_col,
time_col=target_col,
target_col=static_features,
static_features=dropna,
dropna=keep_last_n,
keep_last_n=window_info,
window_info
)self.cv_models_.append(self.models_)
= self._partition_results
partition_results elif not refit:
= self._preprocess_partitions(
partition_results
df,=id_col,
id_col=time_col,
time_col=target_col,
target_col=static_features,
static_features=dropna,
dropna=keep_last_n,
keep_last_n=window_info,
window_info
)= self._get_predict_schema() + f',cutoff:datetime,{self._base_ts.target_col}:double'
schema = fa.transform(
preds
partition_results,
DistributedMLForecast._predict,={
params'models': self.models_,
'horizon': h,
'before_predict_callback': before_predict_callback,
'after_predict_callback': after_predict_callback,
},=schema,
schema=self.engine,
engine
)
results.append(fa.get_native_as_df(preds))return fa.union(*results)
@staticmethod
def _save_ts(items: List[List[Any]], path: str) -> Iterable[pd.DataFrame]:
for serialized_ts, _, _ in items:
= cloudpickle.loads(serialized_ts)
ts = ts.uids[0]
first_uid = ts.uids[-1]
last_uid f'{path}/ts_{first_uid}-{last_uid}.pkl')
ts.save(yield pd.DataFrame({'x': [True]})
def save(self, path: str) -> None:
"""保存预测对象
参数
----------
路径 : str
存储工件的目录。"""
= fa.transform(
dummy_df self._partition_results,
DistributedMLForecast._save_ts,='x:bool',
schema={'path': path},
params=self.engine,
engine
)# trigger computation
dummy_df.as_pandas()with fsspec.open(f'{path}/models.pkl', 'wb') as f:
self.models_, f)
cloudpickle.dump(self._base_ts.save(f'{path}/_base_ts.pkl')
@staticmethod
def _load_ts(paths: List[List[Any]], protocol: str) -> Iterable[pd.DataFrame]:
for [path] in paths:
= TimeSeries.load(path, protocol=protocol)
ts yield pd.DataFrame(
{'ts': [cloudpickle.dumps(ts)],
'train': [cloudpickle.dumps(None)],
'valid': [cloudpickle.dumps(None)],
}
)
@staticmethod
def load(path: str, engine) -> 'DistributedMLForecast':
"""Load forecast object
Parameters
----------
path : str
Directory with saved artifacts.
engine : fugue execution engine
Dask Client, Spark Session, etc to use for the distributed computation.
"""
= fsspec.get_fs_token_paths(f'{path}/ts*')
fs, _, paths = fs.protocol
protocol if isinstance(protocol, tuple):
= protocol[0]
protocol = pd.DataFrame({'path': paths})
names_df = fa.transform(
partition_results
names_df,
DistributedMLForecast._load_ts,='ts:binary,train:binary,valid:binary',
schema='per_row',
partition={'protocol': protocol},
params=engine,
engine=True,
as_fugue
)with fsspec.open(f'{path}/models.pkl', 'rb') as f:
= cloudpickle.load(f)
models = TimeSeries.load(f'{path}/_base_ts.pkl')
base_ts = DistributedMLForecast(models=models, freq=base_ts.freq)
fcst = base_ts
fcst._base_ts = fa.persist(
fcst._partition_results =False, engine=engine, as_fugue=True
partition_results, lazy
)= models
fcst.models_ = engine
fcst.engine = len(paths)
fcst.num_partitions return fcst
@staticmethod
def _update(items: List[List[Any]], new_df) -> Iterable[List[Any]]:
for serialized_ts, serialized_transformed, serialized_valid in items:
= cloudpickle.loads(serialized_ts)
ts = ufp.is_in(new_df[ts.id_col], ts.uids)
partition_mask = ufp.filter_with_mask(new_df, partition_mask)
partition_df
ts.update(partition_df)yield [cloudpickle.dumps(ts), serialized_transformed, serialized_valid]
def update(self, df: pd.DataFrame) -> None:
"""更新存储序列的值。
参数
----------
df : pandas DataFrame
包含新观测值的数据框。"""
if not isinstance(df, pd.DataFrame):
raise ValueError("`df` must be a pandas DataFrame.")
= fa.transform(
res self._partition_results,
DistributedMLForecast._update,={"new_df": df},
params="ts:binary,train:binary,valid:binary",
schema=self.engine,
engine=True,
as_fugue
)self._partition_results = fa.persist(res)
def to_local(self) -> MLForecast:
"""Convert this distributed forecast object into a local one
This pulls all the data from the remote machines, so you have to be sure that
it fits in the scheduler/driver. If you're not sure use the save method instead.
Returns
-------
MLForecast
Local forecast object."""
= fa.select_columns(
serialized_ts self._partition_results,
=['ts'],
columns=True,
as_fugue'ts'].tolist()
).as_pandas()[= [cloudpickle.loads(ts) for ts in serialized_ts]
all_ts # 按ID排序(这些ID在每个分区中应已排序)
= sorted(all_ts, key=lambda ts: ts.uids[0])
all_ts
# 合并属性。由于Fugue 在 pandas 上运行,因此这些都基于 pandas。
# 我们在这里使用utilsforecast,以防我们将来添加对polars的支持。
def possibly_concat_indices(collection):
= isinstance(collection[0], pd.Index)
items_are_indices if items_are_indices:
= [pd.Series(item) for item in collection]
collection = ufp.vertical_concat(collection)
combined if items_are_indices:
= pd.Index(combined)
combined return combined
def combine_target_tfms(by_partition):
if by_partition[0] is None:
return None
= [
by_transform for part in by_partition]
[part[i] for i in range(len(by_partition[0]))
]= []
out for tfms in by_transform:
0].stack(tfms))
out.append(tfms[return out
def combine_core_lag_tfms(by_partition):
= [
by_transform for part in by_partition])
(name, [part[name] for name in by_partition[0].keys()
]= {}
out for name, partition_tfms in by_transform:
= partition_tfms[0].stack(partition_tfms)
out[name] return out
= possibly_concat_indices([ts.uids for ts in all_ts])
uids = possibly_concat_indices([ts.last_dates for ts in all_ts])
last_dates = ufp.vertical_concat([ts.static_features_ for ts in all_ts])
statics = combine_target_tfms(
combined_target_tfms for ts in all_ts]
[ts.target_transforms
)= combine_core_lag_tfms(
combined_core_lag_tfms for ts in all_ts]
[ts._get_core_lag_tfms()
)= np.hstack([np.diff(ts.ga.indptr) for ts in all_ts])
sizes = np.hstack([ts.ga.data for ts in all_ts])
data = np.append(0, sizes).cumsum()
indptr if isinstance(uids, pd.Index):
= uids
uids_idx else:
# uids 是 polars 系列
= pd.Index(uids)
uids_idx if not uids_idx.is_monotonic_increasing:
# 这似乎只发生在雷身上。
# 我们需要整理与该系列相关的所有数据。
= uids_idx.argsort()
sort_idxs = uids[sort_idxs]
uids = last_dates[sort_idxs]
last_dates = ufp.take_rows(statics, sort_idxs)
statics = ufp.drop_index_if_pandas(statics)
statics for tfm in combined_core_lag_tfms.values():
= tfm._core_tfm.take(sort_idxs)
tfm._core_tfm if combined_target_tfms is not None:
= [
combined_target_tfms for tfm in combined_target_tfms
tfm.take(sort_idxs)
]= data.copy()
old_data = indptr.copy()
old_indptr = np.append(0, sizes[sort_idxs]).cumsum()
indptr # 这个循环处理100,000组大小在500到2,000之间的序列,耗时500毫秒。
# 因此,它可能不会成为太大的瓶颈,但请尝试在核心中实现。
for i, sort_idx in enumerate(sort_idxs):
= slice(old_indptr[sort_idx], old_indptr[sort_idx + 1])
old_slice = slice(indptr[i], indptr[i + 1])
new_slice = old_data[old_slice]
data[new_slice] = GroupedArray(data, indptr)
ga
# 所有其他属性应保持一致,因此我们只需覆盖第一个系列。
= all_ts[0]
ts = uids
ts.uids = last_dates
ts.last_dates = ga
ts.ga = statics
ts.static_features_
ts.transforms.update(combined_core_lag_tfms)= combined_target_tfms
ts.target_transforms = MLForecast(models=self.models_, freq=ts.freq)
fcst = ts
fcst.ts = self.models_
fcst.models_ return fcst
show_doc(DistributedMLForecast)
DistributedMLForecast
DistributedMLForecast (models, freq:Union[int,str], lags:Optional[Iterable[int]]=None, lag_transforms: Optional[Dict[int,List[Union[Callable,Tuple[Callab le,Any]]]]]=None, date_features:Optional[Iterable[ Union[str,Callable]]]=None, num_threads:int=1, tar get_transforms:Optional[List[Union[mlforecast.targ et_transforms.BaseTargetTransform,mlforecast.targe t_transforms._BaseGroupedArrayTargetTransform]]]=N one, engine=None, num_partitions:Optional[int]=None, lag_transforms_namer:Optional[Callable]=None)
Multi backend distributed pipeline
show_doc(DistributedMLForecast.fit)
DistributedMLForecast.fit
DistributedMLForecast.fit (df:~AnyDataFrame, id_col:str='unique_id', time_col:str='ds', target_col:str='y', static_features:Optional[List[str]]=None, dropna:bool=True, keep_last_n:Optional[int]=None)
Apply the feature engineering and train the models.
Type | Default | Details | |
---|---|---|---|
df | AnyDataFrame | Series data in long format. | |
id_col | str | unique_id | Column that identifies each serie. |
time_col | str | ds | Column that identifies each timestep, its values can be timestamps or integers. |
target_col | str | y | Column that contains the target. |
static_features | Optional | None | Names of the features that are static and will be repeated when forecasting. |
dropna | bool | True | Drop rows with missing values produced by the transformations. |
keep_last_n | Optional | None | Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it. |
Returns | DistributedMLForecast | Forecast object with series values and trained models. |
show_doc(DistributedMLForecast.predict)
DistributedMLForecast.predict
DistributedMLForecast.predict (h:int, before_predict_callback:Optional[Callable] =None, after_predict_callback:Optional[Cal lable]=None, X_df:Optional[pandas.core.fra me.DataFrame]=None, new_df:Optional[~AnyDataFrame]=None)
Compute the predictions for the next horizon
steps.
Type | Default | Details | |
---|---|---|---|
h | int | Forecast horizon. | |
before_predict_callback | Optional | None | Function to call on the features before computing the predictions. This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure. The series identifier is on the index. |
after_predict_callback | Optional | None | Function to call on the predictions before updating the targets. This function will take a pandas Series with the predictions and should return another one with the same structure. The series identifier is on the index. |
X_df | Optional | None | Dataframe with the future exogenous features. Should have the id column and the time column. |
new_df | Optional | None | Series data of new observations for which forecasts are to be generated. This dataframe should have the same structure as the one used to fit the model, including any features and time series data. If new_df is not None, the method will generate forecasts for the new observations. |
Returns | AnyDataFrame | Predictions for each serie and timestep, with one column per model. |
show_doc(DistributedMLForecast.save)
DistributedMLForecast.save
DistributedMLForecast.save (path:str)
Save forecast object
Type | Details | |
---|---|---|
path | str | Directory where artifacts will be stored. |
Returns | None |
show_doc(DistributedMLForecast.load)
DistributedMLForecast.load
DistributedMLForecast.load (path:str, engine)
Load forecast object
Type | Details | |
---|---|---|
path | str | Directory with saved artifacts. |
engine | fugue execution engine | Dask Client, Spark Session, etc to use for the distributed computation. |
Returns | DistributedMLForecast |
show_doc(DistributedMLForecast.update)
DistributedMLForecast.update
DistributedMLForecast.update (df:pandas.core.frame.DataFrame)
Update the values of the stored series.
Type | Details | |
---|---|---|
df | DataFrame | Dataframe with new observations. |
Returns | None |
show_doc(DistributedMLForecast.to_local)
DistributedMLForecast.to_local
DistributedMLForecast.to_local ()
*Convert this distributed forecast object into a local one
This pulls all the data from the remote machines, so you have to be sure that it fits in the scheduler/driver. If you’re not sure use the save method instead.*
show_doc(DistributedMLForecast.preprocess)
DistributedMLForecast.preprocess
DistributedMLForecast.preprocess (df:~AnyDataFrame, id_col:str='unique_id', time_col:str='ds', target_col:str='y', static_features:Optional[List[str]]=Non e, dropna:bool=True, keep_last_n:Optional[int]=None)
Add the features to data
.
Type | Default | Details | |
---|---|---|---|
df | AnyDataFrame | Series data in long format. | |
id_col | str | unique_id | Column that identifies each serie. |
time_col | str | ds | Column that identifies each timestep, its values can be timestamps or integers. |
target_col | str | y | Column that contains the target. |
static_features | Optional | None | Names of the features that are static and will be repeated when forecasting. |
dropna | bool | True | Drop rows with missing values produced by the transformations. |
keep_last_n | Optional | None | Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it. |
Returns | AnyDataFrame | df with added features. |
show_doc(DistributedMLForecast.cross_validation)
DistributedMLForecast.cross_validation
DistributedMLForecast.cross_validation (df:~AnyDataFrame, n_windows:int, h:int, id_col:str='unique_id', time_col:str='ds', target_col:str='y', step_size:Optional[int]=None, sta tic_features:Optional[List[str]]= None, dropna:bool=True, keep_last_n:Optional[int]=None, refit:bool=True, before_predict_c allback:Optional[Callable]=None, after_predict_callback:Optional[C allable]=None, input_size:Optional[int]=None)
Perform time series cross validation. Creates n_windows
splits where each window has h
test periods, trains the models, computes the predictions and merges the actuals.
Type | Default | Details | |
---|---|---|---|
df | AnyDataFrame | Series data in long format. | |
n_windows | int | Number of windows to evaluate. | |
h | int | Number of test periods in each window. | |
id_col | str | unique_id | Column that identifies each serie. |
time_col | str | ds | Column that identifies each timestep, its values can be timestamps or integers. |
target_col | str | y | Column that contains the target. |
step_size | Optional | None | Step size between each cross validation window. If None it will be equal to h . |
static_features | Optional | None | Names of the features that are static and will be repeated when forecasting. |
dropna | bool | True | Drop rows with missing values produced by the transformations. |
keep_last_n | Optional | None | Keep only these many records from each serie for the forecasting step. Can save time and memory if your features allow it. |
refit | bool | True | Retrain model for each cross validation window. If False, the models are trained at the beginning and then used to predict each window. |
before_predict_callback | Optional | None | Function to call on the features before computing the predictions. This function will take the input dataframe that will be passed to the model for predicting and should return a dataframe with the same structure. The series identifier is on the index. |
after_predict_callback | Optional | None | Function to call on the predictions before updating the targets. This function will take a pandas Series with the predictions and should return another one with the same structure. The series identifier is on the index. |
input_size | Optional | None | Maximum training samples per serie in each window. If None, will use an expanding window. |
Returns | AnyDataFrame | Predictions for each window with the series id, timestamp, target value and predictions from each model. |
Give us a ⭐ on Github