快速开始(分布式)

%load_ext autoreload
%autoreload 2

使用MLForecast进行分布式训练的最小示例

DistributedMLForecast 类是一个高级抽象,封装了管道中的所有步骤(预处理、拟合模型和计算预测),并以分布式方式应用这些步骤。

使用 DistributedMLForecast(而不是 MLForecast)所需的不同事项包括:

  1. 你需要搭建一个集群。我们目前支持 dask、ray 和 spark。
  2. 你的数据需要是一个分布式集合(dask、ray 或 spark 数据框)。
  3. 你需要使用在你选择的框架中实现分布式训练的模型,例如 spark 中的 LightGBM 的 SynapseML。
import platform
import sys
import tempfile

import matplotlib.pyplot as plt
import git
import numpy as np
import pandas as pd
import s3fs
from sklearn.base import BaseEstimator

from mlforecast.distributed import DistributedMLForecast
from mlforecast.lag_transforms import ExpandingMean, ExponentiallyWeightedMean, RollingMean
from mlforecast.target_transforms import Differences
from mlforecast.utils import generate_daily_series, generate_prices_for_series

Dask

import dask.dataframe as dd
from dask.distributed import Client

客户端设置

client = Client(n_workers=2, threads_per_worker=1)

在这里,我们定义了一个连接到 dask.distributed.LocalCluster 的客户端,但它也可以是任何其他类型的集群。

数据设置

对于dask,数据必须是一个 dask.dataframe.DataFrame。你需要确保每个时间序列只在一个分区中,并且建议你的分区数量与工作节点数相同。如果你的分区数量超过工作节点,请确保将 num_threads 设置为1,以避免嵌套的并行性。

所需的输入格式与 MLForecast 相同,只不过它是一个 dask.dataframe.DataFrame 而不是 pandas.Dataframe

series = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False, min_length=500, max_length=1_000)
npartitions = 10
partitioned_series = dd.from_pandas(series.set_index('unique_id'), npartitions=npartitions)  # 确保我们按 id_col 进行拆分
partitioned_series = partitioned_series.map_partitions(lambda df: df.reset_index())
partitioned_series['unique_id'] = partitioned_series['unique_id'].astype(str)  # 目前无法处理分类数据
partitioned_series
Dask DataFrame Structure:
unique_id ds y static_0 static_1
npartitions=10
id_00 object datetime64[ns] float64 int64 int64
id_10 ... ... ... ... ...
... ... ... ... ... ...
id_90 ... ... ... ... ...
id_99 ... ... ... ... ...
Dask Name: assign, 5 expressions

模型

为了进行分布式预测,我们需要使用能够通过dask进行分布式训练的模型。目前的实现有DaskLGBMForecastDaskXGBForecast,它们只是对原生实现的封装。

from mlforecast.distributed.models.dask.lgb import DaskLGBMForecast
from mlforecast.distributed.models.dask.xgb import DaskXGBForecast
models = [DaskXGBForecast(random_state=0), DaskLGBMForecast(random_state=0)]

训练

一旦我们有了模型,我们就实例化一个 DistributedMLForecast 对象,定义我们的特征。然后,我们可以在这个对象上调用 fit 方法,传入我们的 dask 数据框。

fcst = DistributedMLForecast(
    models=models,
    freq='D',
    target_transforms=[Differences([7])],
    lags=[7],
    lag_transforms={
        1: [ExpandingMean(), ExponentiallyWeightedMean(alpha=0.9)],
        7: [RollingMean(window_size=14)],
    },
    date_features=['dayofweek', 'month'],
    num_threads=1,
    engine=client,
)
fcst.fit(partitioned_series)
import fugue.api as fa
from fastcore.test import test_eq
# 用于测试partition_results数据的函数
# 尺寸合适
def test_partition_results_size(fcst_object, expected_n_partitions):
    test_eq(
        fa.get_num_partitions(fcst_object._partition_results),
        expected_n_partitions,
    )
    test_eq(
        fa.count(fcst_object._partition_results),
        expected_n_partitions,
    )
test_partition_results_size(fcst, npartitions)
# 测试num_partitions功能是否正常运行
if sys.version_info >= (3, 9):
    num_partitions_test = 4
    test_dd = dd.from_pandas(series, npartitions=num_partitions_test) # 在这种情况下,我们不需要指定列。
    test_dd['unique_id'] = test_dd['unique_id'].astype(str)
    fcst_np = DistributedMLForecast(
        models=models,
        freq='D',
        target_transforms=[Differences([7])],    
        lags=[7],
        lag_transforms={
            1: [ExpandingMean()],
            7: [RollingMean(window_size=14)]
        },
        date_features=['dayofweek', 'month'],
        num_threads=1,
        engine=client,
        num_partitions=num_partitions_test
    )
    fcst_np.fit(test_dd)
    test_partition_results_size(fcst_np, num_partitions_test)
    preds_np = fcst_np.predict(7).compute().sort_values(['unique_id', 'ds']).reset_index(drop=True)
    preds = fcst.predict(7).compute().sort_values(['unique_id', 'ds']).reset_index(drop=True)
    pd.testing.assert_frame_equal(
        preds[['unique_id', 'ds']], 
        preds_np[['unique_id', 'ds']], 
    )

一旦我们得到了拟合的模型,就可以计算接下来的7个时间步的预测值。

预测

preds = fcst.predict(7).compute()
preds.head()
unique_id ds DaskXGBForecast DaskLGBMForecast
0 id_00 2002-09-27 00:00:00 22.489947 21.679944
1 id_00 2002-09-28 00:00:00 81.806826 84.151205
2 id_00 2002-09-29 00:00:00 162.705641 164.024508
3 id_00 2002-09-30 00:00:00 246.990386 246.099977
4 id_00 2002-10-01 00:00:00 314.741463 315.261537
preds2 = fcst.predict(7).compute()
preds3 = fcst.predict(7, new_df=partitioned_series).compute()
pd.testing.assert_frame_equal(preds, preds2)
pd.testing.assert_frame_equal(preds, preds3)
# 测试 X_df
prices = generate_prices_for_series(series)
series_wexog = series.merge(prices, on=['unique_id', 'ds'])
npartitions = 10
partitioned_series_exog = dd.from_pandas(series_wexog.set_index('unique_id'), npartitions=npartitions)
partitioned_series_exog = partitioned_series_exog.map_partitions(lambda df: df.reset_index())
partitioned_series_exog['unique_id'] = partitioned_series_exog['unique_id'].astype(str)
fcst_exog = DistributedMLForecast(
    models=models,
    freq='D',
    target_transforms=[Differences([7])],    
    lags=[7],
    lag_transforms={
        1: [ExpandingMean()],
        7: [RollingMean(window_size=14)]
    },
    date_features=['dayofweek', 'month'],
    num_threads=1,
    engine=client,
)
fcst_exog.fit(partitioned_series_exog, static_features=['static_0', 'static_1'])
preds_exog = fcst_exog.predict(h=7, X_df=prices).compute()
full_preds = preds.merge(preds_exog, on=['unique_id', 'ds'], suffixes=('', '_exog'))
for model in ('DaskXGBForecast', 'DaskLGBMForecast'):
    pct_diff = abs(1 - full_preds[f'{model}_exog'].div(full_preds[f'{model}']).mean())
    assert 0 < pct_diff < 0.1

保存和加载

一旦你训练好了模型,可以使用 DistributedMLForecast.save 方法来保存推理所需的工件。请记住,如果你在远程集群上,你应该将远程存储(如 S3)设置为目标。

mlforecast 使用 fsspec 来处理不同的文件系统,因此如果你使用 S3,例如,你还需要安装 s3fs。如果你使用 pip,只需包含 aws 额外选项,例如 pip install 'mlforecast[aws,dask]',这将安装执行分布式训练和保存到 S3 所需的依赖项。如果你使用 conda,则需要手动安装它们(conda install dask fsspec fugue s3fs)。

# 为持续集成定义唯一名称
def build_unique_name(engine):
    pyver = f'{sys.version_info.major}_{sys.version_info.minor}'
    repo = git.Repo(search_parent_directories=True)
    sha = repo.head.object.hexsha
    return f'{sys.platform}-{pyver}-{engine}-{sha}'
save_dir = build_unique_name('dask')
save_path = f's3://nixtla-tmp/mlf/{save_dir}'
tmpdir = tempfile.TemporaryDirectory()
try:
    s3fs.S3FileSystem().ls('s3://nixtla-tmp/')
    fcst.save(save_path)
except Exception as e:
    print(e)
    save_path = f'{tmpdir.name}/{save_dir}'
    fcst.save(save_path)

一旦您保存了预测对象,您就可以通过指定保存路径以及将用于执行分布式计算的引擎(在这种情况下是dask客户端)将其加载回来。

fcst2 = DistributedMLForecast.load(save_path, engine=client)

我们可以验证这个对象产生相同的结果。

preds = fa.as_pandas(fcst.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 = fa.as_pandas(fcst2.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(preds, preds2)
class Lag1Model(BaseEstimator):
    def fit(self, X, y):
        return self

    @property
    def model_(self):
        return self

    def predict(self, X):
        return X['lag1']

upd_fcst = DistributedMLForecast(
    models=[Lag1Model()],
    freq='D',
    lags=[1],
    num_threads=1,
    engine=client,
)
upd_fcst.fit(partitioned_series)

new_df = (series.groupby('unique_id', observed=True)['ds'].max() + pd.offsets.Day()).reset_index()
new_df['y'] = -1.0
upd_fcst.update(new_df)
expected = new_df.rename(columns={'y': 'Lag1Model'})
expected = expected.astype({'unique_id': str})
expected['ds'] += pd.offsets.Day()
upd_preds = upd_fcst.predict(1).compute()
pd.testing.assert_frame_equal(
    upd_preds.reset_index(drop=True),
    expected.reset_index(drop=True),
    check_dtype=False,
)

转换为本地

另一种存储分布式预测对象的选项是先将其转变为本地对象,然后再保存。请注意,为了做到这一点,所有从系列中存储的远程数据必须拉取到一台机器上(例如dask中的调度器、spark中的驱动程序等),因此您必须确保它能够适应内存,它的消耗大约是目标列大小的2倍(您可以通过在fit方法中使用keep_last_n参数进一步减少这一点)。

local_fcst = fcst.to_local()
local_preds = local_fcst.predict(10)
# 我们不检查数据类型,因为有时这些是箭头数据类型。
# 或不同精度的浮点数
pd.testing.assert_frame_equal(preds, local_preds, check_dtype=False)
# 测试无目标变换的本地化
fcst_no_targ_tfms = DistributedMLForecast(
    models=[DaskXGBForecast(n_estimators=5, random_state=0)],
    freq='D',    
    lags=[1],
    lag_transforms={1: [ExpandingMean()]},
    date_features=['dayofweek'],
)
fcst_no_targ_tfms.fit(
    partitioned_series,
    static_features=['static_0', 'static_1'],
)
local_fcst = fcst_no_targ_tfms.to_local()
assert local_fcst.ts.target_transforms is None

交叉验证

cv_res = fcst.cross_validation(
    partitioned_series,
    n_windows=3,
    h=14,
)
cv_res.compute().head()
unique_id ds DaskXGBForecast DaskLGBMForecast cutoff y
17 id_01 2002-08-19 00:00:00 224.458336 222.742605 2002-08-15 00:00:00 210.723139
43 id_03 2002-08-17 00:00:00 2.235601 2.210624 2002-08-15 00:00:00 2.416967
44 id_03 2002-08-18 00:00:00 3.276747 3.239702 2002-08-15 00:00:00 3.060194
119 id_08 2002-08-23 00:00:00 131.261689 131.180289 2002-08-15 00:00:00 138.668463
131 id_09 2002-08-21 00:00:00 27.716417 28.263963 2002-08-15 00:00:00 22.88374
from mlforecast.distributed.forecast import WindowInfo
# 输入大小
input_size = 100
reduced_train = fcst._preprocess(
    partitioned_series,
    id_col='unique_id',
    time_col='ds',
    target_col='y',
    dropna=False,
    window_info=WindowInfo(
        n_windows=1,
        window_size=10,
        step_size=None,
        i_window=0,
        input_size=input_size,
    ),
)
assert reduced_train.groupby('unique_id').size().compute().max() == input_size
cv_res_no_refit = fcst.cross_validation(
    partitioned_series,
    n_windows=3,
    h=14,
    refit=False
)
cv_results_df = cv_res.compute().sort_values(['unique_id', 'ds'])
cv_results_no_refit_df = cv_res_no_refit.compute().sort_values(['unique_id', 'ds'])
# 测试我们恢复相同 "metadata"
models = ['DaskXGBForecast', 'DaskLGBMForecast']
test_eq(
    cv_results_no_refit_df.drop(columns=models),
    cv_results_df.drop(columns=models)
)
non_std_series = partitioned_series.copy()
non_std_series = non_std_series.rename(columns={'ds': 'time', 'y': 'value', 'unique_id': 'some_id'})
flow_params = dict(
    models=[DaskXGBForecast(random_state=0)],
    target_transforms=[Differences([7])],    
    lags=[7],
    lag_transforms={
        1: [ExpandingMean()],
        7: [RollingMean(window_size=14)]
    },
    num_threads=1,
)
fcst = DistributedMLForecast(freq='D', **flow_params)
fcst.fit(partitioned_series)
preds = fcst.predict(7).compute()
fcst2 = DistributedMLForecast(freq='D', **flow_params)
fcst2.preprocess(non_std_series, id_col='some_id', time_col='time', target_col='value')
fcst2.models_ = fcst.models_  # 分布式训练最终可能会得到不同的拟合结果。
non_std_preds = fcst2.predict(7).compute()
pd.testing.assert_frame_equal(
    preds.drop(columns='ds'),
    non_std_preds.drop(columns='time').rename(columns={'some_id': 'unique_id'})
)
client.close()

火花

会话设置

from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .config("spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.10.2")
    .config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
    .getOrCreate()
)

数据设置

对于Spark,数据必须是一个 pyspark DataFrame。你需要确保每个时间序列只在一个分区中(例如,可以使用 repartitionByRange 来实现),并且建议分区的数量与工作节点的数量相等。如果分区数量超过工作节点,请确保设置 num_threads=1 以避免嵌套并行。

所需的输入格式与 MLForecast 相同,即应该至少包含一个ID列、一个时间列和一个目标列。

numPartitions = 4
series = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False)
spark_series = spark.createDataFrame(series).repartitionByRange(numPartitions, 'unique_id')

模型

为了执行分布式预测,我们需要使用一个能够通过spark以分布式方式进行训练的模型。目前的实现是SparkLGBMForecastSparkXGBForecast,它们只是对原生实现的包装。

from mlforecast.distributed.models.spark.lgb import SparkLGBMForecast
from mlforecast.distributed.models.spark.xgb import SparkXGBForecast
models = [SparkLGBMForecast(seed=0), SparkXGBForecast(random_state=0)]

训练

fcst = DistributedMLForecast(
    models,
    freq='D',
    target_transforms=[Differences([7])],    
    lags=[1],
    lag_transforms={
        1: [ExpandingMean(), ExponentiallyWeightedMean(alpha=0.9)],
    },
    date_features=['dayofweek'],
)
fcst.fit(
    spark_series,
    static_features=['static_0', 'static_1'],
)
test_partition_results_size(fcst, numPartitions)
# 测试num_partitions功能是否正常
test_spark_df = spark.createDataFrame(series)
num_partitions_test = 10
fcst_np = DistributedMLForecast(
    models=models,
    freq='D',    
    lags=[7],
    lag_transforms={
        1: [ExpandingMean()],
        7: [RollingMean(window_size=14)]
    },
    date_features=['dayofweek', 'month'],
    num_threads=1,
    num_partitions=num_partitions_test,
)
fcst_np.fit(test_spark_df)
test_partition_results_size(fcst_np, num_partitions_test)
preds_np = fcst_np.predict(7).toPandas().sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds = fcst.predict(7).toPandas().sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(
    preds[['unique_id', 'ds']], 
    preds_np[['unique_id', 'ds']], 
)

预测

preds = fcst.predict(14).toPandas()
preds.head()
unique_id ds SparkLGBMForecast SparkXGBForecast
0 id_00 2001-05-15 430.964632 431.202969
1 id_00 2001-05-16 505.411960 504.030227
2 id_00 2001-05-17 9.889056 9.706636
3 id_00 2001-05-18 99.359694 96.258271
4 id_00 2001-05-19 196.307731 197.443618

保存和加载

一旦您训练了模型,可以使用 DistributedMLForecast.save 方法保存推理所需的工件。请记住,如果您使用的是远程集群,您应该将 S3 等远程存储设置为目标。

mlforecast 使用 fsspec 来处理不同的文件系统,因此如果您例如使用 S3,您还需要安装 s3fs。如果您使用 pip,您只需包含 aws 附加项,例如 pip install 'mlforecast[aws,spark]',这将安装执行分布式训练和保存到 S3 所需的依赖项。如果您使用 conda,您必须手动安装这些依赖项(conda install fsspec fugue pyspark s3fs)。

save_dir = build_unique_name('spark')
save_path = f's3://nixtla-tmp/mlf/{save_dir}'
try:
    s3fs.S3FileSystem().ls('s3://nixtla-tmp/')
    fcst.save(save_path)
except Exception as e:
    print(e)
    save_path = f'{tmpdir.name}/{save_dir}'
    fcst.save(save_path)
                                                                                

一旦您保存了预测对象,您可以通过指定保存路径以及将用于执行分布式计算的引擎(在本例中是Spark会话)来重新加载它。

fcst2 = DistributedMLForecast.load(save_path, engine=spark)
                                                                                

我们可以验证这个对象产生相同的结果。

preds = fa.as_pandas(fcst.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 = fa.as_pandas(fcst2.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(preds, preds2)
                                                                                

转换为本地

存储您的分布式预测对象的另一个选项是先将其转换为本地对象,然后再保存。请注意,为了实现这一点,所有从系列中存储的远程数据都必须被拉入到单台机器上(dask中的调度器,spark中的驱动程序等),因此您必须确保数据能够适应内存,它的大小应该大约是您目标列大小的2倍(您可以通过在fit方法中使用keep_last_n参数进一步减少这个大小)。

local_fcst = fcst.to_local()
local_preds = local_fcst.predict(10)
# 我们不检查数据类型,因为有时这些是箭头数据类型。
# 或不同精度的浮点数
pd.testing.assert_frame_equal(preds, local_preds, check_dtype=False)

交叉验证

cv_res = fcst.cross_validation(
    spark_series,
    n_windows=3,
    h=14,
).toPandas()
cv_res.head()
unique_id ds SparkLGBMForecast SparkXGBForecast cutoff y
0 id_15 2001-04-04 88.438691 86.105463 2001-04-02 92.468763
1 id_25 2001-04-12 355.712493 354.525400 2001-04-02 320.701359
2 id_03 2001-04-08 257.243845 253.834157 2001-04-02 274.420045
3 id_14 2001-04-07 24.925278 23.833504 2001-04-02 26.906679
4 id_01 2001-04-16 89.180665 90.743194 2001-04-02 93.807725
spark.stop()

会话设置

import ray
from ray.cluster_utils import Cluster
ray_cluster = Cluster(
    initialize_head=True,
    head_node_args={"num_cpus": 2}
)
ray.init(address=ray_cluster.address, ignore_reinit_error=True)
# 添加模拟节点以模拟集群
mock_node = ray_cluster.add_node(num_cpus=2)

数据设置

对于ray,数据必须是一个ray DataFrame。建议你拥有与工作线程数量相同的分区。如果你有的分区数量多于工作线程,请确保设置num_threads=1以避免嵌套并行。

所需的输入格式与MLForecast相同,即至少应有一个id列,一个时间列和一个目标列。

series = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False)
# 我们需要非类别的唯一标识符。
series['unique_id'] = series['unique_id'].astype(str)
ray_series = ray.data.from_pandas(series)

模型

Ray集成允许包含lightgbmRayLGBMRegressor)和xgboostRayXGBRegressor)。

from mlforecast.distributed.models.ray.lgb import RayLGBMForecast
from mlforecast.distributed.models.ray.xgb import RayXGBForecast
models = [RayLGBMForecast(random_state=0), RayXGBForecast(random_state=0)]

训练

要控制使用Ray的分区数量,我们必须在DistributedMLForecast中包含num_partitions

num_partitions = 4
fcst = DistributedMLForecast(
    models,
    freq='D',
    target_transforms=[Differences([7])],
    lags=[1],
    lag_transforms={
        1: [ExpandingMean(), ExponentiallyWeightedMean(alpha=0.9)],
    },
    date_features=['dayofweek'],
    num_partitions=num_partitions, # 使用 num_partitions 来减少开销
)
fcst.fit(
    ray_series,
    static_features=['static_0', 'static_1'],
)
test_partition_results_size(fcst, num_partitions)
# 测试num_partitions功能是否正常
# 在这种情况下,我们测试默认行为。 
# 对于光线数据集,其工作方式符合预期。
fcst_np = DistributedMLForecast(
    models=models,
    freq='D',
    lags=[7],
    lag_transforms={
        1: [ExpandingMean()],
        7: [RollingMean(window_size=14)]
    },
    date_features=['dayofweek', 'month'],
    num_threads=1,
)
fcst_np.fit(ray_series)
# 我们不使用test_partition_results_size。
# 由于物体的数量不同 
# 从分区数量来看
test_eq(fa.count(fcst_np._partition_results), 100) # 系列数量
preds_np = fcst_np.predict(7).to_pandas().sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds = fcst.predict(7).to_pandas().sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(
    preds[['unique_id', 'ds']], 
    preds_np[['unique_id', 'ds']], 
)

预测

preds = fcst.predict(14).to_pandas()
preds.head()
unique_id ds RayLGBMForecast RayXGBForecast
0 id_01 2001-05-15 118.505341 118.32222
1 id_01 2001-05-16 152.321457 152.265915
2 id_01 2001-05-17 181.979599 181.945618
3 id_01 2001-05-18 9.530758 9.543224
4 id_01 2001-05-19 40.503441 40.661186

保存和加载

训练完模型后,可以使用 DistributedMLForecast.save 方法保存用于推理的工件。请记住,如果您在远程集群上,则应将远程存储(如 S3)设置为目标。

mlforecast 使用 fsspec 来处理不同的文件系统,因此如果您使用 s3,您还需要安装 s3fs。如果您使用 pip,只需包含 aws 附加项,例如 pip install 'mlforecast[aws,ray]',这将安装使用 ray 进行分布式训练和保存到 S3 所需的依赖项。如果您使用 conda,您需要手动安装它们(conda install fsspec fugue ray s3fs)。

save_dir = build_unique_name('ray')
save_path = f's3://nixtla-tmp/mlf/{save_dir}'
try:
    s3fs.S3FileSystem().ls('s3://nixtla-tmp/')
    fcst.save(save_path)
except Exception as e:
    print(e)
    save_path = f'{tmpdir.name}/{save_dir}'
    fcst.save(save_path)

一旦你保存了你的预测对象,就可以通过指定保存路径和一个将用于执行分布式计算的引擎(在这种情况下是’ray’字符串)将其加载回来。

fcst2 = DistributedMLForecast.load(save_path, engine='ray')

我们可以验证这个对象生成相同的结果。

preds = fa.as_pandas(fcst.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 = fa.as_pandas(fcst2.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
pd.testing.assert_frame_equal(preds, preds2)

转换为本地

将您的分布式预测对象存储的另一种选择是先将其转换为本地对象,然后保存。请记住,为了做到这一点,从系列中存储的所有远程数据必须拉回到单个机器上(dask 中的调度器、spark 中的驱动程序等),所以您必须确保其可以容纳在内存中,这将消耗大约 2 倍于目标列的大小(您可以通过在 fit 方法中使用 keep_last_n 参数进一步减少这个大小)。

local_fcst = fcst.to_local()
local_preds = local_fcst.predict(10)
# 我们不检查数据类型,因为有时这些是箭头数据类型。
# 或不同精度的浮点数
pd.testing.assert_frame_equal(preds, local_preds, check_dtype=False)

交叉验证

cv_res = fcst.cross_validation(
    ray_series,
    n_windows=3,
    h=14,
).to_pandas()
cv_res.head()
unique_id ds RayLGBMForecast RayXGBForecast cutoff y
0 id_10 2001-05-01 24.767561 24.528799 2001-04-30 31.878545
1 id_10 2001-05-07 1.916985 2.323445 2001-04-30 7.365955
2 id_13 2001-05-01 210.900330 212.959320 2001-04-30 190.485236
3 id_14 2001-05-01 196.620819 196.253036 2001-04-30 213.631212
4 id_14 2001-05-03 323.323334 322.372894 2001-04-30 338.234837
ray.shutdown()

Give us a ⭐ on Github