FugueBackend

%load_ext autoreload
%autoreload 2

StatsForecast 的计算效率可以追溯到其两个核心组件: 1. 用 NumBa 编写的 models,可优化 Python 代码以达到 C 语言的速度。 2. 其 core.StatsForecast 类,支持分布式计算。

在这里,我们使用 Fugue,这是一个统一的 DaskSpark 接口。

from fastcore.test import test_eq
from nbdev.showdoc import add_docs, show_doc
import inspect
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union

import cloudpickle
import fugue.api as fa
import numpy as np
import pandas as pd
from fugue import transform, DataFrame, FugueWorkflow, ExecutionEngine, AnyDataFrame
from fugue.collections.yielded import Yielded
from fugue.constants import FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT
from triad import Schema

from statsforecast.core import _StatsForecast, ParallelBackend, _id_as_idx, _param_descriptions, make_backend
from statsforecast.utils import ConformalIntervals
def _cotransform(
    df1: Any,
    df2: Any,
    using: Any,
    schema: Any = None,
    params: Any = None,
    partition: Any = None,
    engine: Any = None,
    engine_conf: Any = None,
    force_output_fugue_dataframe: bool = False,
    as_local: bool = False,
) -> Any:
    dag = FugueWorkflow(compile_conf={FUGUE_CONF_WORKFLOW_EXCEPTION_INJECT: 0})
    
    src = dag.create_data(df1).zip(dag.create_data(df2), partition=partition)
    tdf = src.transform(
        using=using,
        schema=schema,
        params=params,
        pre_partition=partition,
    )
    tdf.yield_dataframe_as("result", as_local=as_local)
    dag.run(engine, conf=engine_conf)
    result = dag.yields["result"].result  # 类型:忽略
    if force_output_fugue_dataframe or isinstance(df1, (DataFrame, Yielded)):
        return result
    return result.as_pandas() if result.is_local else result.native  # 类型:忽略
class FugueBackend(ParallelBackend):
    """FugueBackend for Distributed Computation.
    [Source code](https://github.com/Nixtla/statsforecast/blob/main/statsforecast/distributed/fugue.py).

    This class uses [Fugue](https://github.com/fugue-project/fugue) backend capable of distributing 
    computation on Spark, Dask and Ray without any rewrites.

    Parameters
    ----------
    engine : fugue.ExecutionEngine
        A selection between Spark, Dask, and Ray.
    conf : fugue.Config
        Engine configuration.
    **transform_kwargs
        Additional kwargs for Fugue's transform method.

    Notes
    -----
    A short introduction to Fugue, with examples on how to scale pandas code to Spark, Dask or Ray
     is available [here](https://fugue-tutorials.readthedocs.io/tutorials/quick_look/ten_minutes.html).
    """
    def __init__(
            self, 
            engine: Any = None,
            conf: Any = None,
            **transform_kwargs: Any
        ):        
        self._engine = engine
        self._conf = conf
        self._transform_kwargs = dict(transform_kwargs)

    def __getstate__(self) -> Dict[str, Any]:
        return {}

    def _forecast(
        self,
        *,
        df: pd.DataFrame,
        X_df: Optional[pd.DataFrame],
        models,
        fallback_model,        
        freq,
        h,
        level,
        prediction_intervals,
        id_col,
        time_col,
        target_col,
        fitted,
    ) -> Tuple[_StatsForecast, pd.DataFrame]:
        model = _StatsForecast(
            models=models,
            freq=freq, 
            fallback_model=fallback_model,
            n_jobs=1,
        )
        result = model.forecast(
            df=df,
            h=h,
            X_df=X_df,
            level=level,
            fitted=fitted,
            prediction_intervals=prediction_intervals,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
        )
        if _id_as_idx():
            result = result.reset_index()
        return model, result

    def _forecast_noX(
        self,
        df: pd.DataFrame,
        *,
        models,
        fallback_model,        
        freq,
        h,
        level,
        prediction_intervals,
        id_col,
        time_col,
        target_col,
    ) -> pd.DataFrame:
        _, result = self._forecast(
            df=df,
            X_df=None,
            models=models,
            fallback_model=fallback_model,
            freq=freq,
            h=h,
            level=level,
            fitted=False,
            prediction_intervals=prediction_intervals,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
        )
        return result

    def _forecast_noX_fitted(
        self,
        df: pd.DataFrame,
        *,
        models,
        fallback_model,        
        freq,
        h,
        level,
        prediction_intervals,
        id_col,
        time_col,
        target_col,
    ) -> List[List[Any]]:
        model, result = self._forecast(
            df=df,
            X_df=None,
            models=models,
            fallback_model=fallback_model,
            freq=freq,
            h=h,
            level=level,
            fitted=True,            
            prediction_intervals=prediction_intervals,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
        )
        fitted_vals = model.forecast_fitted_values()
        if _id_as_idx():
            fitted_vals = fitted_vals.reset_index()
        return [[cloudpickle.dumps(result), cloudpickle.dumps(fitted_vals)]]

    def _forecast_X(
        self,
        df: pd.DataFrame,
        X_df: pd.DataFrame,
        *,
        models,
        fallback_model,        
        freq,
        h,
        level,
        prediction_intervals,
        id_col,
        time_col,
        target_col,
    ) -> pd.DataFrame:
        _, result = self._forecast(
            df=df,
            X_df=X_df,
            models=models,
            fallback_model=fallback_model,
            freq=freq,
            h=h,
            level=level,
            fitted=False,
            prediction_intervals=prediction_intervals,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
        )
        return result

    def _forecast_X_fitted(
        self,
        df: pd.DataFrame,
        X_df: pd.DataFrame,
        *,
        models,
        fallback_model,        
        freq,
        h,
        level,
        prediction_intervals,
        id_col,
        time_col,
        target_col,
    ) -> List[List[Any]]:
        model, result = self._forecast(
            df=df,
            X_df=X_df,
            models=models,
            fallback_model=fallback_model,
            freq=freq,
            h=h,
            level=level,
            fitted=True,
            prediction_intervals=prediction_intervals,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
        )
        fitted_vals = model.forecast_fitted_values()
        if _id_as_idx():
            fitted_vals = fitted_vals.reset_index()
        return [[cloudpickle.dumps(result), cloudpickle.dumps(fitted_vals)]]        

    def _get_output_schema(
        self,
        *,
        df,
        models,
        level,
        mode,
        id_col,
        time_col,
        target_col,
    ) -> Schema:
        keep_schema = fa.get_schema(df).extract([id_col, time_col])
        cols: List[Any] = []
        if level is None:
            level = []
        for model in models:
            has_levels = (
                "level" in inspect.signature(getattr(model, "forecast")).parameters
                and len(level) > 0
            )
            cols.append((repr(model), np.float32))
            if has_levels:
                cols.extend([(f"{repr(model)}-lo-{l}", np.float32) for l in reversed(level)])
                cols.extend([(f"{repr(model)}-hi-{l}", np.float32) for l in level])
        if mode == "cv":
            cols = [("cutoff", keep_schema[time_col].type), (target_col, np.float32)] + cols
        return keep_schema + Schema(cols)

    @staticmethod
    def _retrieve_forecast_df(items: List[List[Any]]) -> Iterable[pd.DataFrame]:
        for serialized_fcst_df, _ in items:
            yield cloudpickle.loads(serialized_fcst_df)

    @staticmethod
    def _retrieve_fitted_df(items: List[List[Any]]) -> Iterable[pd.DataFrame]:
        for _, serialized_fitted_df in items:
            yield cloudpickle.loads(serialized_fitted_df)    

    def forecast(
        self,
        *,
        df: AnyDataFrame,
        freq: Union[str, int],
        models: List[Any],
        fallback_model: Optional[Any],
        X_df: Optional[AnyDataFrame],
        h: int,
        level: Optional[List[int]],
        fitted: bool,
        prediction_intervals: Optional[ConformalIntervals],
        id_col: str,
        time_col: str,
        target_col: str,
    ) -> Any:
        """Memory Efficient core.StatsForecast predictions with FugueBackend.

        This method uses Fugue's transform function, in combination with 
        `core.StatsForecast`'s forecast to efficiently fit a list of StatsForecast models.

        Parameters
        ----------
        {df}
        {freq}
        {models}
        {fallback_model}
        {X_df}
        {h}
        {level}
        {fitted}
        {prediction_intervals}
        {id_col}
        {time_col}
        {target_col}

        Returns
        -------
        fcsts_df : pandas.DataFrame
            DataFrame with `models` columns for point predictions and probabilistic predictions for all fitted `models`
        
        References
        ----------
        For more information check the 
        [Fugue's transform](https://fugue-tutorials.readthedocs.io/tutorials/beginner/transform.html)
        tutorial.
        The [core.StatsForecast's forecast](https://nixtla.github.io/statsforecast/core.html#statsforecast.forecast)
        method documentation.
        Or the list of available [StatsForecast's models](https://nixtla.github.io/statsforecast/src/core/models.html).
        """
        self._fcst_schema = self._get_output_schema(
            df=df,
            models=models,
            level=level,
            mode='forecast',
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
        )
        self._fitted_schema = self._fcst_schema + fa.get_schema(df).extract([target_col])
        tfm_schema = 'a:binary, b:binary' if fitted else self._fcst_schema
        params = dict(
            models=models,
            freq=freq,
            fallback_model=fallback_model,
            h=h,
            level=level,
            prediction_intervals=prediction_intervals,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
        )
        tfm_kwargs = dict(
            params=params,
            schema=tfm_schema,
            partition={"by": id_col},
            engine=self._engine,
            engine_conf=self._conf,            
        )
        if not fitted:
            if X_df is None:
                res = transform(df, self._forecast_noX, **tfm_kwargs)
            else:
                res = _cotransform(df, X_df, self._forecast_X, **tfm_kwargs)
        else:
            if X_df is None:
                res_with_fitted = transform(df, self._forecast_noX_fitted, **tfm_kwargs)
            else:
                res_with_fitted = _cotransform(
                    df, X_df, self._forecast_X_fitted, **tfm_kwargs
                )
            # 这里的持久化避免了重新计算整个内容
            # 在获取拟合值时
            self._results = fa.persist(res_with_fitted)
            res = transform(
                self._results,
                FugueBackend._retrieve_forecast_df,
                schema=self._fcst_schema,
                engine=self._engine,
            )
        return res

    forecast.__doc__ = forecast.__doc__.format(**_param_descriptions)  # 类型:忽略[联合属性]

    def forecast_fitted_values(self):
        """获取样本内预测"""
        if not hasattr(self, '_results'):
            raise ValueError('You must first call forecast with `fitted=True`.')
        return transform(
            self._results,
            FugueBackend._retrieve_fitted_df,
            schema=self._fitted_schema,
            engine=self._engine,
        )

    def _cv(
        self,
        df: pd.DataFrame,
        *,
        models,
        freq,
        fallback_model,
        h,
        n_windows,
        step_size,
        test_size,
        input_size,
        level,
        refit,
        fitted,
        prediction_intervals,
        id_col,
        time_col,
        target_col,
    ) -> pd.DataFrame:
        model = _StatsForecast(
            models=models,
            freq=freq, 
            fallback_model=fallback_model,
            n_jobs=1,
        )
        result = model.cross_validation(
            df=df,
            h=h,
            n_windows=n_windows,
            step_size=step_size,
            test_size=test_size,
            input_size=input_size,
            level=level,
            fitted=fitted,
            refit=refit,
            prediction_intervals=prediction_intervals,
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
        )
        if _id_as_idx():
            result = result.reset_index()
        return result    

    def cross_validation(
        self,
        *,
        df: AnyDataFrame,        
        freq: Union[str, int],
        models: List[Any],
        fallback_model: Optional[Any],        
        h: int,
        n_windows: int,
        step_size: int,
        test_size: int,
        input_size: int,
        level: Optional[List[int]],
        refit: bool,
        fitted: bool,
        prediction_intervals: Optional[ConformalIntervals],
        id_col: str,
        time_col: str,
        target_col: str,
    ) -> Any:
        """Temporal Cross-Validation with core.StatsForecast and FugueBackend.

        This method uses Fugue's transform function, in combination with 
        `core.StatsForecast`'s cross-validation to efficiently fit a list of StatsForecast 
        models through multiple training windows, in either chained or rolled manner.

        `StatsForecast.models`' speed along with Fugue's distributed computation allow to 
        overcome this evaluation technique high computational costs. Temporal cross-validation 
        provides better model's generalization measurements by increasing the test's length 
        and diversity.

        Parameters
        ----------
        {df}
        {freq}
        {models}
        {fallback_model}
        {h}
        {n_windows}
        {step_size}
        {test_size}
        {input_size}
        {level}
        {refit}
        {fitted}
        {prediction_intervals}
        {id_col}
        {time_col}
        {target_col}

        Returns
        -------
        pandas.DataFrame
            DataFrame, with `models` columns for point predictions and probabilistic predictions for all fitted `models`.
        
        References
        ----------
        The [core.StatsForecast's cross validation](https://nixtla.github.io/statsforecast/core.html#statsforecast.cross_validation)
        method documentation.
        [Rob J. Hyndman and George Athanasopoulos (2018). "Forecasting principles and practice, Temporal Cross-Validation"](https://otexts.com/fpp3/tscv.html).
        """
        schema = self._get_output_schema(
            df=df,
            models=models,
            level=level,
            mode='cv',
            id_col=id_col,
            time_col=time_col,
            target_col=target_col,
        )
        return transform(
            df,
            self._cv,
            params=dict(
                models=models,
                freq=freq,
                fallback_model=fallback_model,
                h=h,
                n_windows=n_windows,
                step_size=step_size,
                test_size=test_size,
                input_size=input_size,
                level=level,
                refit=refit,
                fitted=fitted,
                prediction_intervals=prediction_intervals,
                id_col=id_col,
                time_col=time_col,
                target_col=target_col,              
            ),
            schema=schema,
            partition={"by": id_col},
            engine=self._engine,
            engine_conf=self._conf,
            **self._transform_kwargs,
        )

    cross_validation.__doc__ = cross_validation.__doc__.format(**_param_descriptions)  # 类型:忽略[联合属性]


@make_backend.candidate(lambda obj, *args, **kwargs: isinstance(obj, ExecutionEngine))
def _make_fugue_backend(obj:ExecutionEngine, *args, **kwargs) -> ParallelBackend:
    return FugueBackend(obj, **kwargs)
import os

from statsforecast.core import StatsForecast
from statsforecast.models import ( 
    AutoARIMA,
    AutoETS,
)
from statsforecast.utils import generate_series
os.environ['NIXTLA_ID_AS_COL'] = '1'
n_series = 4
horizon = 7

series = generate_series(n_series)

sf = StatsForecast(
    models=[AutoETS(season_length=7)],
    freq='D',
)

sf.cross_validation(df=series, h=horizon, step_size = 24,
    n_windows = 2, level=[90]).head()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

# 将 unique_id 设为一列
series['unique_id'] = series['unique_id'].astype(str)

# 转换为Spark
sdf = spark.createDataFrame(series)
# 返回一个Spark DataFrame
sf = StatsForecast(
    models=[AutoETS(season_length=7)],
    freq='D',
)
sf.cross_validation(df=sdf, h=horizon, step_size = 24,
    n_windows = 2, level=[90]).show()
show_doc(FugueBackend, title_level=3)
show_doc(FugueBackend.forecast)
show_doc(FugueBackend.cross_validation)

Dask 分布式预测

在这里,我们提供了一个使用 Fugue 在 Dask 集群中执行代码来分发 StatsForecast 预测的示例。

为此,我们实例化 FugueBackend 类,并使用 DaskExecutionEngine

import dask.dataframe as dd
from dask.distributed import Client
from fugue_dask import DaskExecutionEngine
from statsforecast import StatsForecast
from statsforecast.models import Naive
from statsforecast.utils import generate_series
# 生成合成面板数据
df = generate_series(10)
df['unique_id'] = df['unique_id'].astype(str)
df = dd.from_pandas(df, npartitions=10)

# 使用DaskExecutionEngine实例化FugueBackend
dask_client = Client()
engine = DaskExecutionEngine(dask_client=dask_client)

我们只是简单地创建了一个类,用于常规的 StatsForecast 实例化。

sf = StatsForecast(models=[Naive()], freq='D')

分布式预测

对于极快速的分布式预测,我们使用FugueBackend作为后端,操作方式类似于原始的StatsForecast.forecast方法。

它接受一个包含[unique_id,ds,y]和外生变量的pandas.DataFrame作为输入,其中ds(日期戳)列应为Pandas期望的格式。y列必须是数值型,表示我们希望预测的测量值。而unique_id则唯一标识面板数据中的系列。

# 使用FugueBackend进行分布式预测。
sf.forecast(df=df, h=12).compute()
#| 评估: 错误
# 备用模型
class FailNaive:
    def forecast(self):
        pass
    def __repr__(self):
        return 'Naive'
sf = StatsForecast(models=[Naive()], freq='D', fallback_model=Naive())
dask_fcst = sf.forecast(df=df, h=12).compute()
fcst_stats = sf.forecast(df=df.compute(), h=12)
pd.testing.assert_frame_equal(
    (
        dask_fcst
        .sort_values(by=['unique_id', 'ds'])
        .reset_index(drop=True)
        .astype({'ds': 'datetime64[ns]', 'Naive': 'float32'})
    ),
    fcst_stats
)
sf = StatsForecast(models=[Naive()], freq='D')
xx = sf.forecast(df=df, h=12, fitted=True).compute()
yy = sf.forecast_fitted_values().compute()
#| 评估: 错误

# 分布式外生回归变量
class ReturnX:
    
    def __init__(self):
        pass
    
    def fit(self, y, X):
        return self
    
    def predict(self, h, X):
        mean = X
        return X
    
    def __repr__(self):
        return 'ReturnX'
    
    def forecast(self, y, h, X=None, X_future=None, fitted=False):
        return {'mean': X_future.flatten()}
    
    def new(self):
        b = type(self).__new__(type(self))
        b.__dict__.update(self.__dict__)
        return b
    
df_w_ex = pd.DataFrame(
    {
        'ds': np.hstack([np.arange(10), np.arange(10)]),
        'y': np.random.rand(20),
        'x': np.arange(20, dtype=np.float32),
    },
    index=pd.Index([0] * 10 + [1] * 10, name='unique_id'),
).reset_index()
train_mask = df_w_ex['ds'] < 6
train_df = dd.from_pandas(df_w_ex[train_mask], npartitions=10)
test_df = df_w_ex[~train_mask]
xreg = dd.from_pandas(test_df.drop(columns='y').reset_index(drop=True), npartitions=10)
#| 评估: 错误

# 分布式外生回归变量
fcst_x = StatsForecast(models=[ReturnX()], freq=1)
res = fcst_x.forecast(df=train_df, 
                      X_df=xreg, 
                      h=4).compute()
expected_res = xreg.rename(columns={'x': 'ReturnX'}).compute()
pd.testing.assert_frame_equal(
    (
        res
        .sort_values('unique_id')
        .reset_index(drop=True)
        .astype(expected_res.dtypes)
    ),
    expected_res,
)

分布式交叉验证

对于极其快速的分布式时间序列交叉验证,我们使用 cross_validation 方法,该方法的操作类似于原始的 StatsForecast.cross_validation 方法。

# 使用FugueBackend进行分布式交叉验证。
sf.cross_validation(df=df, h=12, n_windows=2).compute()
# 评估:错误
from statsforecast.models import Naive
from statsforecast.utils import generate_series
#| 评估: 错误
# 生成合成面板数据。
df = generate_series(10).reset_index()
df['unique_id'] = df['unique_id'].astype(str)
df = dd.from_pandas(df, npartitions=10)

# 分发预测。
sf = StatsForecast(models=[Naive()], freq='D')
fcst_fugue = sf.forecast(df=df, h=12).compute().sort_values(['unique_id', 'ds']).reset_index(drop=True)
fcst_stats = sf.forecast(df=df.compute(), h=12).astype({"unique_id": str})
test_eq(fcst_fugue.astype(fcst_stats.dtypes), fcst_stats)

# 分布式交叉验证预测。
sf = StatsForecast(models=[Naive()], freq='D')
fcst_fugue = sf.cross_validation(df=df, h=12).compute().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)
fcst_stats = sf.cross_validation(df=df.compute(), h=12).astype({"unique_id": str})
test_eq(fcst_fugue.astype(fcst_stats.dtypes), fcst_stats)

# 备用模型
class FailNaive:
    def forecast(self):
        pass
    def __repr__(self):
        return 'Naive'
    
#交叉验证回退模型
fcst = StatsForecast(models=[FailNaive()], freq='D', fallback_model=Naive())
fcst_fugue = fcst.cross_validation(df=df, h=12).compute().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)
fcst_stats = sf.cross_validation(df=df.compute(), h=12).astype({"unique_id": str})
test_eq(fcst_fugue.astype(fcst_stats.dtypes), fcst_stats)
#| 评估: 错误
# 测试射线积分
import ray
from statsforecast.models import Naive
from statsforecast.utils import generate_series
#| 评估: 错误
# 生成合成面板数据。
df = generate_series(10).reset_index()
df['unique_id'] = df['unique_id'].astype(str)
df = ray.data.from_pandas(df).repartition(2)

# 分发预测。
sf = StatsForecast(models=[Naive()], freq='D')
fcst_fugue = sf.forecast(df=df, h=12).to_pandas().sort_values(['unique_id', 'ds']).reset_index(drop=True)
fcst_stats = sf.forecast(df=df.to_pandas(), h=12).astype({"unique_id": str})
test_eq(fcst_fugue.astype(fcst_stats.dtypes), fcst_stats)

# 分发交叉验证预测。
fcst = StatsForecast(models=[Naive()], freq='D')
fcst_fugue = fcst.cross_validation(df=df, h=12).to_pandas().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)
fcst_stats = sf.cross_validation(df=df.to_pandas(), h=12).astype({"unique_id": str})
test_eq(fcst_fugue.astype(fcst_stats.dtypes), fcst_stats)

# 备用模型
class FailNaive:
    def forecast(self):
        pass
    def __repr__(self):
        return 'Naive'
    
#交叉验证回退模型
sf = StatsForecast(models=[FailNaive()], freq='D', fallback_model=Naive())
fcst_fugue = sf.cross_validation(df=df, h=12).to_pandas().sort_values(['unique_id', 'ds', 'cutoff']).reset_index(drop=True)
fcst_stats = sf.cross_validation(df=df.to_pandas(), h=12).astype({"unique_id": str})
test_eq(fcst_fugue.astype(fcst_stats.dtypes), fcst_stats)
#| 评估: 错误

# 分布式外生回归变量
sf = StatsForecast(models=[ReturnX()], freq=1)
res = sf.forecast(df=train_df, 
                  X_df=xreg, 
                  h=4).compute()
expected_res = xreg.compute().rename(columns={'x': 'ReturnX'})
# 我们期望使用外生变量的唯一标识符和数据集。
pd.testing.assert_frame_equal(
    res.sort_values('unique_id').reset_index(drop=True).astype(expected_res.dtypes), 
    expected_res
)

Give us a ⭐ on Github