%load_ext autoreload
%autoreload 2
FugueBackend
StatsForecast
的计算效率可以追溯到其两个核心组件: 1. 用 NumBa 编写的 models
,可优化 Python 代码以达到 C 语言的速度。 2. 其 core.StatsForecast
类,支持分布式计算。
在这里,我们使用 Fugue,这是一个统一的 Dask
和 Spark
接口。
from fastcore.test import test_eq
from nbdev.showdoc import add_docs, show_doc
import inspect
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
import cloudpickle
import fugue.api as fa
import numpy as np
import pandas as pd
from fugue import transform, DataFrame, FugueWorkflow, ExecutionEngine, AnyDataFrame
from fugue.collections.yielded import Yielded
from fugue.constants import FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT
from triad import Schema
from statsforecast.core import _StatsForecast, ParallelBackend, _id_as_idx, _param_descriptions, make_backend
from statsforecast.utils import ConformalIntervals
def _cotransform(
df1: Any,
df2: Any,
using: Any,= None,
schema: Any = None,
params: Any = None,
partition: Any = None,
engine: Any = None,
engine_conf: Any bool = False,
force_output_fugue_dataframe: bool = False,
as_local: -> Any:
) = FugueWorkflow(compile_conf={FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT: 0})
dag
= dag.create_data(df1).zip(dag.create_data(df2), partition=partition)
src = src.transform(
tdf =using,
using=schema,
schema=params,
params=partition,
pre_partition
)"result", as_local=as_local)
tdf.yield_dataframe_as(=engine_conf)
dag.run(engine, conf= dag.yields["result"].result # 类型:忽略
result if force_output_fugue_dataframe or isinstance(df1, (DataFrame, Yielded)):
return result
return result.as_pandas() if result.is_local else result.native # 类型:忽略
class FugueBackend(ParallelBackend):
"""FugueBackend for Distributed Computation.
[Source code](https://github.com/Nixtla/statsforecast/blob/main/statsforecast/distributed/fugue.py).
This class uses [Fugue](https://github.com/fugue-project/fugue) backend capable of distributing
computation on Spark, Dask and Ray without any rewrites.
Parameters
----------
engine : fugue.ExecutionEngine
A selection between Spark, Dask, and Ray.
conf : fugue.Config
Engine configuration.
**transform_kwargs
Additional kwargs for Fugue's transform method.
Notes
-----
A short introduction to Fugue, with examples on how to scale pandas code to Spark, Dask or Ray
is available [here](https://fugue-tutorials.readthedocs.io/tutorials/quick_look/ten_minutes.html).
"""
def __init__(
self,
= None,
engine: Any = None,
conf: Any **transform_kwargs: Any
): self._engine = engine
self._conf = conf
self._transform_kwargs = dict(transform_kwargs)
def __getstate__(self) -> Dict[str, Any]:
return {}
def _forecast(
self,
*,
df: pd.DataFrame,
X_df: Optional[pd.DataFrame],
models,
fallback_model,
freq,
h,
level,
prediction_intervals,
id_col,
time_col,
target_col,
fitted,-> Tuple[_StatsForecast, pd.DataFrame]:
) = _StatsForecast(
model =models,
models=freq,
freq=fallback_model,
fallback_model=1,
n_jobs
)= model.forecast(
result =df,
df=h,
h=X_df,
X_df=level,
level=fitted,
fitted=prediction_intervals,
prediction_intervals=id_col,
id_col=time_col,
time_col=target_col,
target_col
)if _id_as_idx():
= result.reset_index()
result return model, result
def _forecast_noX(
self,
df: pd.DataFrame,*,
models,
fallback_model,
freq,
h,
level,
prediction_intervals,
id_col,
time_col,
target_col,-> pd.DataFrame:
) = self._forecast(
_, result =df,
df=None,
X_df=models,
models=fallback_model,
fallback_model=freq,
freq=h,
h=level,
level=False,
fitted=prediction_intervals,
prediction_intervals=id_col,
id_col=time_col,
time_col=target_col,
target_col
)return result
def _forecast_noX_fitted(
self,
df: pd.DataFrame,*,
models,
fallback_model,
freq,
h,
level,
prediction_intervals,
id_col,
time_col,
target_col,-> List[List[Any]]:
) = self._forecast(
model, result =df,
df=None,
X_df=models,
models=fallback_model,
fallback_model=freq,
freq=h,
h=level,
level=True,
fitted=prediction_intervals,
prediction_intervals=id_col,
id_col=time_col,
time_col=target_col,
target_col
)= model.forecast_fitted_values()
fitted_vals if _id_as_idx():
= fitted_vals.reset_index()
fitted_vals return [[cloudpickle.dumps(result), cloudpickle.dumps(fitted_vals)]]
def _forecast_X(
self,
df: pd.DataFrame,
X_df: pd.DataFrame,*,
models,
fallback_model,
freq,
h,
level,
prediction_intervals,
id_col,
time_col,
target_col,-> pd.DataFrame:
) = self._forecast(
_, result =df,
df=X_df,
X_df=models,
models=fallback_model,
fallback_model=freq,
freq=h,
h=level,
level=False,
fitted=prediction_intervals,
prediction_intervals=id_col,
id_col=time_col,
time_col=target_col,
target_col
)return result
def _forecast_X_fitted(
self,
df: pd.DataFrame,
X_df: pd.DataFrame,*,
models,
fallback_model,
freq,
h,
level,
prediction_intervals,
id_col,
time_col,
target_col,-> List[List[Any]]:
) = self._forecast(
model, result =df,
df=X_df,
X_df=models,
models=fallback_model,
fallback_model=freq,
freq=h,
h=level,
level=True,
fitted=prediction_intervals,
prediction_intervals=id_col,
id_col=time_col,
time_col=target_col,
target_col
)= model.forecast_fitted_values()
fitted_vals if _id_as_idx():
= fitted_vals.reset_index()
fitted_vals return [[cloudpickle.dumps(result), cloudpickle.dumps(fitted_vals)]]
def _get_output_schema(
self,
*,
df,
models,
level,
mode,
id_col,
time_col,
target_col,-> Schema:
) = fa.get_schema(df).extract([id_col, time_col])
keep_schema = []
cols: List[Any] if level is None:
= []
level for model in models:
= (
has_levels "level" in inspect.signature(getattr(model, "forecast")).parameters
and len(level) > 0
)repr(model), np.float32))
cols.append((if has_levels:
f"{repr(model)}-lo-{l}", np.float32) for l in reversed(level)])
cols.extend([(f"{repr(model)}-hi-{l}", np.float32) for l in level])
cols.extend([(if mode == "cv":
= [("cutoff", keep_schema[time_col].type), (target_col, np.float32)] + cols
cols return keep_schema + Schema(cols)
@staticmethod
def _retrieve_forecast_df(items: List[List[Any]]) -> Iterable[pd.DataFrame]:
for serialized_fcst_df, _ in items:
yield cloudpickle.loads(serialized_fcst_df)
@staticmethod
def _retrieve_fitted_df(items: List[List[Any]]) -> Iterable[pd.DataFrame]:
for _, serialized_fitted_df in items:
yield cloudpickle.loads(serialized_fitted_df)
def forecast(
self,
*,
df: AnyDataFrame,str, int],
freq: Union[
models: List[Any],
fallback_model: Optional[Any],
X_df: Optional[AnyDataFrame],int,
h: int]],
level: Optional[List[bool,
fitted:
prediction_intervals: Optional[ConformalIntervals],str,
id_col: str,
time_col: str,
target_col: -> Any:
) """Memory Efficient core.StatsForecast predictions with FugueBackend.
This method uses Fugue's transform function, in combination with
`core.StatsForecast`'s forecast to efficiently fit a list of StatsForecast models.
Parameters
----------
{df}
{freq}
{models}
{fallback_model}
{X_df}
{h}
{level}
{fitted}
{prediction_intervals}
{id_col}
{time_col}
{target_col}
Returns
-------
fcsts_df : pandas.DataFrame
DataFrame with `models` columns for point predictions and probabilistic predictions for all fitted `models`
References
----------
For more information check the
[Fugue's transform](https://fugue-tutorials.readthedocs.io/tutorials/beginner/transform.html)
tutorial.
The [core.StatsForecast's forecast](https://nixtla.github.io/statsforecast/core.html#statsforecast.forecast)
method documentation.
Or the list of available [StatsForecast's models](https://nixtla.github.io/statsforecast/src/core/models.html).
"""
self._fcst_schema = self._get_output_schema(
=df,
df=models,
models=level,
level='forecast',
mode=id_col,
id_col=time_col,
time_col=target_col,
target_col
)self._fitted_schema = self._fcst_schema + fa.get_schema(df).extract([target_col])
= 'a:binary, b:binary' if fitted else self._fcst_schema
tfm_schema = dict(
params =models,
models=freq,
freq=fallback_model,
fallback_model=h,
h=level,
level=prediction_intervals,
prediction_intervals=id_col,
id_col=time_col,
time_col=target_col,
target_col
)= dict(
tfm_kwargs =params,
params=tfm_schema,
schema={"by": id_col},
partition=self._engine,
engine=self._conf,
engine_conf
)if not fitted:
if X_df is None:
= transform(df, self._forecast_noX, **tfm_kwargs)
res else:
= _cotransform(df, X_df, self._forecast_X, **tfm_kwargs)
res else:
if X_df is None:
= transform(df, self._forecast_noX_fitted, **tfm_kwargs)
res_with_fitted else:
= _cotransform(
res_with_fitted self._forecast_X_fitted, **tfm_kwargs
df, X_df,
)# 这里的持久化避免了重新计算整个内容
# 在获取拟合值时
self._results = fa.persist(res_with_fitted)
= transform(
res self._results,
FugueBackend._retrieve_forecast_df,=self._fcst_schema,
schema=self._engine,
engine
)return res
= forecast.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性]
forecast.__doc__
def forecast_fitted_values(self):
"""获取样本内预测"""
if not hasattr(self, '_results'):
raise ValueError('You must first call forecast with `fitted=True`.')
return transform(
self._results,
FugueBackend._retrieve_fitted_df,=self._fitted_schema,
schema=self._engine,
engine
)
def _cv(
self,
df: pd.DataFrame,*,
models,
freq,
fallback_model,
h,
n_windows,
step_size,
test_size,
input_size,
level,
refit,
fitted,
prediction_intervals,
id_col,
time_col,
target_col,-> pd.DataFrame:
) = _StatsForecast(
model =models,
models=freq,
freq=fallback_model,
fallback_model=1,
n_jobs
)= model.cross_validation(
result =df,
df=h,
h=n_windows,
n_windows=step_size,
step_size=test_size,
test_size=input_size,
input_size=level,
level=fitted,
fitted=refit,
refit=prediction_intervals,
prediction_intervals=id_col,
id_col=time_col,
time_col=target_col,
target_col
)if _id_as_idx():
= result.reset_index()
result return result
def cross_validation(
self,
*,
df: AnyDataFrame, str, int],
freq: Union[
models: List[Any],
fallback_model: Optional[Any], int,
h: int,
n_windows: int,
step_size: int,
test_size: int,
input_size: int]],
level: Optional[List[bool,
refit: bool,
fitted:
prediction_intervals: Optional[ConformalIntervals],str,
id_col: str,
time_col: str,
target_col: -> Any:
) """Temporal Cross-Validation with core.StatsForecast and FugueBackend.
This method uses Fugue's transform function, in combination with
`core.StatsForecast`'s cross-validation to efficiently fit a list of StatsForecast
models through multiple training windows, in either chained or rolled manner.
`StatsForecast.models`' speed along with Fugue's distributed computation allow to
overcome this evaluation technique high computational costs. Temporal cross-validation
provides better model's generalization measurements by increasing the test's length
and diversity.
Parameters
----------
{df}
{freq}
{models}
{fallback_model}
{h}
{n_windows}
{step_size}
{test_size}
{input_size}
{level}
{refit}
{fitted}
{prediction_intervals}
{id_col}
{time_col}
{target_col}
Returns
-------
pandas.DataFrame
DataFrame, with `models` columns for point predictions and probabilistic predictions for all fitted `models`.
References
----------
The [core.StatsForecast's cross validation](https://nixtla.github.io/statsforecast/core.html#statsforecast.cross_validation)
method documentation.
[Rob J. Hyndman and George Athanasopoulos (2018). "Forecasting principles and practice, Temporal Cross-Validation"](https://otexts.com/fpp3/tscv.html).
"""
= self._get_output_schema(
schema =df,
df=models,
models=level,
level='cv',
mode=id_col,
id_col=time_col,
time_col=target_col,
target_col
)return transform(
df,self._cv,
=dict(
params=models,
models=freq,
freq=fallback_model,
fallback_model=h,
h=n_windows,
n_windows=step_size,
step_size=test_size,
test_size=input_size,
input_size=level,
level=refit,
refit=fitted,
fitted=prediction_intervals,
prediction_intervals=id_col,
id_col=time_col,
time_col=target_col,
target_col
),=schema,
schema={"by": id_col},
partition=self._engine,
engine=self._conf,
engine_conf**self._transform_kwargs,
)
= cross_validation.__doc__.format(**_param_descriptions) # 类型:忽略[联合属性]
cross_validation.__doc__
@make_backend.candidate(lambda obj, *args, **kwargs: isinstance(obj, ExecutionEngine))
def _make_fugue_backend(obj:ExecutionEngine, *args, **kwargs) -> ParallelBackend:
return FugueBackend(obj, **kwargs)
import os
from statsforecast.core import StatsForecast
from statsforecast.models import (
AutoARIMA,
AutoETS,
)from statsforecast.utils import generate_series
'NIXTLA_ID_AS_COL'] = '1' os.environ[
= 4
n_series = 7
horizon
= generate_series(n_series)
series
= StatsForecast(
sf =[AutoETS(season_length=7)],
models='D',
freq
)
=series, h=horizon, step_size = 24,
sf.cross_validation(df= 2, level=[90]).head() n_windows
from pyspark.sql import SparkSession
= SparkSession.builder.getOrCreate()
spark
# 将 unique_id 设为一列
'unique_id'] = series['unique_id'].astype(str)
series[
# 转换为Spark
= spark.createDataFrame(series) sdf
# 返回一个Spark DataFrame
= StatsForecast(
sf =[AutoETS(season_length=7)],
models='D',
freq
)=sdf, h=horizon, step_size = 24,
sf.cross_validation(df= 2, level=[90]).show() n_windows
=3) show_doc(FugueBackend, title_level
show_doc(FugueBackend.forecast)
show_doc(FugueBackend.cross_validation)
Dask 分布式预测
在这里,我们提供了一个使用 Fugue
在 Dask 集群中执行代码来分发 StatsForecast
预测的示例。
为此,我们实例化 FugueBackend
类,并使用 DaskExecutionEngine
。
import dask.dataframe as dd
from dask.distributed import Client
from fugue_dask import DaskExecutionEngine
from statsforecast import StatsForecast
from statsforecast.models import Naive
from statsforecast.utils import generate_series
# 生成合成面板数据
= generate_series(10)
df 'unique_id'] = df['unique_id'].astype(str)
df[= dd.from_pandas(df, npartitions=10)
df
# 使用DaskExecutionEngine实例化FugueBackend
= Client()
dask_client = DaskExecutionEngine(dask_client=dask_client) engine
我们只是简单地创建了一个类,用于常规的 StatsForecast
实例化。
= StatsForecast(models=[Naive()], freq='D') sf
分布式预测
对于极快速的分布式预测,我们使用FugueBackend作为后端,操作方式类似于原始的StatsForecast.forecast方法。
它接受一个包含[unique_id
,ds
,y
]和外生变量的pandas.DataFrame作为输入,其中ds
(日期戳)列应为Pandas期望的格式。y
列必须是数值型,表示我们希望预测的测量值。而unique_id
则唯一标识面板数据中的系列。
# 使用FugueBackend进行分布式预测。
=df, h=12).compute() sf.forecast(df
#| 评估: 错误
# 备用模型
class FailNaive:
def forecast(self):
pass
def __repr__(self):
return 'Naive'
= StatsForecast(models=[Naive()], freq='D', fallback_model=Naive())
sf = sf.forecast(df=df, h=12).compute()
dask_fcst = sf.forecast(df=df.compute(), h=12)
fcst_stats
pd.testing.assert_frame_equal(
(
dask_fcst=['unique_id', 'ds'])
.sort_values(by=True)
.reset_index(drop'ds': 'datetime64[ns]', 'Naive': 'float32'})
.astype({
),
fcst_stats )
= StatsForecast(models=[Naive()], freq='D')
sf = sf.forecast(df=df, h=12, fitted=True).compute()
xx = sf.forecast_fitted_values().compute() yy
#| 评估: 错误
# 分布式外生回归变量
class ReturnX:
def __init__(self):
pass
def fit(self, y, X):
return self
def predict(self, h, X):
= X
mean return X
def __repr__(self):
return 'ReturnX'
def forecast(self, y, h, X=None, X_future=None, fitted=False):
return {'mean': X_future.flatten()}
def new(self):
= type(self).__new__(type(self))
b self.__dict__)
b.__dict__.update(return b
= pd.DataFrame(
df_w_ex
{'ds': np.hstack([np.arange(10), np.arange(10)]),
'y': np.random.rand(20),
'x': np.arange(20, dtype=np.float32),
},=pd.Index([0] * 10 + [1] * 10, name='unique_id'),
index
).reset_index()= df_w_ex['ds'] < 6
train_mask = dd.from_pandas(df_w_ex[train_mask], npartitions=10)
train_df = df_w_ex[~train_mask]
test_df = dd.from_pandas(test_df.drop(columns='y').reset_index(drop=True), npartitions=10) xreg
#| 评估: 错误
# 分布式外生回归变量
= StatsForecast(models=[ReturnX()], freq=1)
fcst_x = fcst_x.forecast(df=train_df,
res =xreg,
X_df=4).compute()
h= xreg.rename(columns={'x': 'ReturnX'}).compute()
expected_res
pd.testing.assert_frame_equal(
(
res'unique_id')
.sort_values(=True)
.reset_index(drop
.astype(expected_res.dtypes)
),
expected_res, )
分布式交叉验证
对于极其快速的分布式时间序列交叉验证,我们使用 cross_validation
方法,该方法的操作类似于原始的 StatsForecast.cross_validation 方法。
# 使用FugueBackend进行分布式交叉验证。
=df, h=12, n_windows=2).compute() sf.cross_validation(df
# 评估:错误
from statsforecast.models import Naive
from statsforecast.utils import generate_series
#| 评估: 错误
# 生成合成面板数据。
= generate_series(10).reset_index()
df 'unique_id'] = df['unique_id'].astype(str)
df[= dd.from_pandas(df, npartitions=10)
df
# 分发预测。
= StatsForecast(models=[Naive()], freq='D')
sf = sf.forecast(df=df, h=12).compute().sort_values(['unique_id', 'ds']).reset_index(drop=True)
fcst_fugue = sf.forecast(df=df.compute(), h=12).astype({"unique_id": str})
fcst_stats
test_eq(fcst_fugue.astype(fcst_stats.dtypes), fcst_stats)
# 分布式交叉验证预测。
= StatsForecast(models=[Naive()], freq='D')
sf = sf.cross_validation(df=df, h=12).compute().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)
fcst_fugue = sf.cross_validation(df=df.compute(), h=12).astype({"unique_id": str})
fcst_stats
test_eq(fcst_fugue.astype(fcst_stats.dtypes), fcst_stats)
# 备用模型
class FailNaive:
def forecast(self):
pass
def __repr__(self):
return 'Naive'
#交叉验证回退模型
= StatsForecast(models=[FailNaive()], freq='D', fallback_model=Naive())
fcst = fcst.cross_validation(df=df, h=12).compute().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)
fcst_fugue = sf.cross_validation(df=df.compute(), h=12).astype({"unique_id": str})
fcst_stats test_eq(fcst_fugue.astype(fcst_stats.dtypes), fcst_stats)
#| 评估: 错误
# 测试射线积分
import ray
from statsforecast.models import Naive
from statsforecast.utils import generate_series
#| 评估: 错误
# 生成合成面板数据。
= generate_series(10).reset_index()
df 'unique_id'] = df['unique_id'].astype(str)
df[= ray.data.from_pandas(df).repartition(2)
df
# 分发预测。
= StatsForecast(models=[Naive()], freq='D')
sf = sf.forecast(df=df, h=12).to_pandas().sort_values(['unique_id', 'ds']).reset_index(drop=True)
fcst_fugue = sf.forecast(df=df.to_pandas(), h=12).astype({"unique_id": str})
fcst_stats
test_eq(fcst_fugue.astype(fcst_stats.dtypes), fcst_stats)
# 分发交叉验证预测。
= StatsForecast(models=[Naive()], freq='D')
fcst = fcst.cross_validation(df=df, h=12).to_pandas().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)
fcst_fugue = sf.cross_validation(df=df.to_pandas(), h=12).astype({"unique_id": str})
fcst_stats
test_eq(fcst_fugue.astype(fcst_stats.dtypes), fcst_stats)
# 备用模型
class FailNaive:
def forecast(self):
pass
def __repr__(self):
return 'Naive'
#交叉验证回退模型
= StatsForecast(models=[FailNaive()], freq='D', fallback_model=Naive())
sf = sf.cross_validation(df=df, h=12).to_pandas().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)
fcst_fugue = sf.cross_validation(df=df.to_pandas(), h=12).astype({"unique_id": str})
fcst_stats test_eq(fcst_fugue.astype(fcst_stats.dtypes), fcst_stats)
#| 评估: 错误
# 分布式外生回归变量
= StatsForecast(models=[ReturnX()], freq=1)
sf = sf.forecast(df=train_df,
res =xreg,
X_df=4).compute()
h= xreg.compute().rename(columns={'x': 'ReturnX'})
expected_res # 我们期望使用外生变量的唯一标识符和数据集。
pd.testing.assert_frame_equal('unique_id').reset_index(drop=True).astype(expected_res.dtypes),
res.sort_values(
expected_res )
Give us a ⭐ on Github