%load_ext autoreload
%autoreload 2
快速开始(分布式)
使用MLForecast进行分布式训练的最小示例
DistributedMLForecast
类是一个高级抽象,封装了管道中的所有步骤(预处理、拟合模型和计算预测),并以分布式方式应用这些步骤。
使用 DistributedMLForecast
(而不是 MLForecast
)所需的不同事项包括:
- 你需要搭建一个集群。我们目前支持 dask、ray 和 spark。
- 你的数据需要是一个分布式集合(dask、ray 或 spark 数据框)。
- 你需要使用在你选择的框架中实现分布式训练的模型,例如 spark 中的 LightGBM 的 SynapseML。
import platform
import sys
import tempfile
import matplotlib.pyplot as plt
import git
import numpy as np
import pandas as pd
import s3fs
from sklearn.base import BaseEstimator
from mlforecast.distributed import DistributedMLForecast
from mlforecast.lag_transforms import ExpandingMean, ExponentiallyWeightedMean, RollingMean
from mlforecast.target_transforms import Differences
from mlforecast.utils import generate_daily_series, generate_prices_for_series
Dask
import dask.dataframe as dd
from dask.distributed import Client
客户端设置
= Client(n_workers=2, threads_per_worker=1) client
在这里,我们定义了一个连接到 dask.distributed.LocalCluster
的客户端,但它也可以是任何其他类型的集群。
数据设置
对于dask,数据必须是一个 dask.dataframe.DataFrame
。你需要确保每个时间序列只在一个分区中,并且建议你的分区数量与工作节点数相同。如果你的分区数量超过工作节点,请确保将 num_threads
设置为1,以避免嵌套的并行性。
所需的输入格式与 MLForecast
相同,只不过它是一个 dask.dataframe.DataFrame
而不是 pandas.Dataframe
。
= generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False, min_length=500, max_length=1_000)
series = 10
npartitions = dd.from_pandas(series.set_index('unique_id'), npartitions=npartitions) # 确保我们按 id_col 进行拆分
partitioned_series = partitioned_series.map_partitions(lambda df: df.reset_index())
partitioned_series 'unique_id'] = partitioned_series['unique_id'].astype(str) # 目前无法处理分类数据
partitioned_series[ partitioned_series
unique_id | ds | y | static_0 | static_1 | |
---|---|---|---|---|---|
npartitions=10 | |||||
id_00 | object | datetime64[ns] | float64 | int64 | int64 |
id_10 | ... | ... | ... | ... | ... |
... | ... | ... | ... | ... | ... |
id_90 | ... | ... | ... | ... | ... |
id_99 | ... | ... | ... | ... | ... |
模型
为了进行分布式预测,我们需要使用能够通过dask
进行分布式训练的模型。目前的实现有DaskLGBMForecast
和DaskXGBForecast
,它们只是对原生实现的封装。
from mlforecast.distributed.models.dask.lgb import DaskLGBMForecast
from mlforecast.distributed.models.dask.xgb import DaskXGBForecast
= [DaskXGBForecast(random_state=0), DaskLGBMForecast(random_state=0)] models
训练
一旦我们有了模型,我们就实例化一个 DistributedMLForecast
对象,定义我们的特征。然后,我们可以在这个对象上调用 fit
方法,传入我们的 dask 数据框。
= DistributedMLForecast(
fcst =models,
models='D',
freq=[Differences([7])],
target_transforms=[7],
lags={
lag_transforms1: [ExpandingMean(), ExponentiallyWeightedMean(alpha=0.9)],
7: [RollingMean(window_size=14)],
},=['dayofweek', 'month'],
date_features=1,
num_threads=client,
engine
) fcst.fit(partitioned_series)
import fugue.api as fa
from fastcore.test import test_eq
# 用于测试partition_results数据的函数
# 尺寸合适
def test_partition_results_size(fcst_object, expected_n_partitions):
test_eq(
fa.get_num_partitions(fcst_object._partition_results),
expected_n_partitions,
)
test_eq(
fa.count(fcst_object._partition_results),
expected_n_partitions, )
test_partition_results_size(fcst, npartitions)
# 测试num_partitions功能是否正常运行
if sys.version_info >= (3, 9):
= 4
num_partitions_test = dd.from_pandas(series, npartitions=num_partitions_test) # 在这种情况下,我们不需要指定列。
test_dd 'unique_id'] = test_dd['unique_id'].astype(str)
test_dd[= DistributedMLForecast(
fcst_np =models,
models='D',
freq=[Differences([7])],
target_transforms=[7],
lags={
lag_transforms1: [ExpandingMean()],
7: [RollingMean(window_size=14)]
},=['dayofweek', 'month'],
date_features=1,
num_threads=client,
engine=num_partitions_test
num_partitions
)
fcst_np.fit(test_dd)
test_partition_results_size(fcst_np, num_partitions_test)= fcst_np.predict(7).compute().sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds_np = fcst.predict(7).compute().sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds
pd.testing.assert_frame_equal('unique_id', 'ds']],
preds[['unique_id', 'ds']],
preds_np[[ )
一旦我们得到了拟合的模型,就可以计算接下来的7个时间步的预测值。
预测
= fcst.predict(7).compute()
preds preds.head()
unique_id | ds | DaskXGBForecast | DaskLGBMForecast | |
---|---|---|---|---|
0 | id_00 | 2002-09-27 00:00:00 | 22.489947 | 21.679944 |
1 | id_00 | 2002-09-28 00:00:00 | 81.806826 | 84.151205 |
2 | id_00 | 2002-09-29 00:00:00 | 162.705641 | 164.024508 |
3 | id_00 | 2002-09-30 00:00:00 | 246.990386 | 246.099977 |
4 | id_00 | 2002-10-01 00:00:00 | 314.741463 | 315.261537 |
= fcst.predict(7).compute()
preds2 = fcst.predict(7, new_df=partitioned_series).compute()
preds3
pd.testing.assert_frame_equal(preds, preds2) pd.testing.assert_frame_equal(preds, preds3)
# 测试 X_df
= generate_prices_for_series(series)
prices = series.merge(prices, on=['unique_id', 'ds'])
series_wexog = 10
npartitions = dd.from_pandas(series_wexog.set_index('unique_id'), npartitions=npartitions)
partitioned_series_exog = partitioned_series_exog.map_partitions(lambda df: df.reset_index())
partitioned_series_exog 'unique_id'] = partitioned_series_exog['unique_id'].astype(str)
partitioned_series_exog[= DistributedMLForecast(
fcst_exog =models,
models='D',
freq=[Differences([7])],
target_transforms=[7],
lags={
lag_transforms1: [ExpandingMean()],
7: [RollingMean(window_size=14)]
},=['dayofweek', 'month'],
date_features=1,
num_threads=client,
engine
)=['static_0', 'static_1'])
fcst_exog.fit(partitioned_series_exog, static_features= fcst_exog.predict(h=7, X_df=prices).compute()
preds_exog = preds.merge(preds_exog, on=['unique_id', 'ds'], suffixes=('', '_exog'))
full_preds for model in ('DaskXGBForecast', 'DaskLGBMForecast'):
= abs(1 - full_preds[f'{model}_exog'].div(full_preds[f'{model}']).mean())
pct_diff assert 0 < pct_diff < 0.1
保存和加载
一旦你训练好了模型,可以使用 DistributedMLForecast.save
方法来保存推理所需的工件。请记住,如果你在远程集群上,你应该将远程存储(如 S3)设置为目标。
mlforecast 使用 fsspec 来处理不同的文件系统,因此如果你使用 S3,例如,你还需要安装 s3fs。如果你使用 pip,只需包含 aws 额外选项,例如 pip install 'mlforecast[aws,dask]'
,这将安装执行分布式训练和保存到 S3 所需的依赖项。如果你使用 conda,则需要手动安装它们(conda install dask fsspec fugue s3fs
)。
# 为持续集成定义唯一名称
def build_unique_name(engine):
= f'{sys.version_info.major}_{sys.version_info.minor}'
pyver = git.Repo(search_parent_directories=True)
repo = repo.head.object.hexsha
sha return f'{sys.platform}-{pyver}-{engine}-{sha}'
= build_unique_name('dask')
save_dir = f's3://nixtla-tmp/mlf/{save_dir}'
save_path = tempfile.TemporaryDirectory()
tmpdir try:
's3://nixtla-tmp/')
s3fs.S3FileSystem().ls(
fcst.save(save_path)except Exception as e:
print(e)
= f'{tmpdir.name}/{save_dir}'
save_path fcst.save(save_path)
一旦您保存了预测对象,您就可以通过指定保存路径以及将用于执行分布式计算的引擎(在这种情况下是dask客户端)将其加载回来。
= DistributedMLForecast.load(save_path, engine=client) fcst2
我们可以验证这个对象产生相同的结果。
= fa.as_pandas(fcst.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds = fa.as_pandas(fcst2.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 pd.testing.assert_frame_equal(preds, preds2)
class Lag1Model(BaseEstimator):
def fit(self, X, y):
return self
@property
def model_(self):
return self
def predict(self, X):
return X['lag1']
= DistributedMLForecast(
upd_fcst =[Lag1Model()],
models='D',
freq=[1],
lags=1,
num_threads=client,
engine
)
upd_fcst.fit(partitioned_series)
= (series.groupby('unique_id', observed=True)['ds'].max() + pd.offsets.Day()).reset_index()
new_df 'y'] = -1.0
new_df[
upd_fcst.update(new_df)= new_df.rename(columns={'y': 'Lag1Model'})
expected = expected.astype({'unique_id': str})
expected 'ds'] += pd.offsets.Day()
expected[= upd_fcst.predict(1).compute()
upd_preds
pd.testing.assert_frame_equal(=True),
upd_preds.reset_index(drop=True),
expected.reset_index(drop=False,
check_dtype )
转换为本地
另一种存储分布式预测对象的选项是先将其转变为本地对象,然后再保存。请注意,为了做到这一点,所有从系列中存储的远程数据必须拉取到一台机器上(例如dask中的调度器、spark中的驱动程序等),因此您必须确保它能够适应内存,它的消耗大约是目标列大小的2倍(您可以通过在fit
方法中使用keep_last_n
参数进一步减少这一点)。
= fcst.to_local()
local_fcst = local_fcst.predict(10)
local_preds # 我们不检查数据类型,因为有时这些是箭头数据类型。
# 或不同精度的浮点数
=False) pd.testing.assert_frame_equal(preds, local_preds, check_dtype
# 测试无目标变换的本地化
= DistributedMLForecast(
fcst_no_targ_tfms =[DaskXGBForecast(n_estimators=5, random_state=0)],
models='D',
freq=[1],
lags={1: [ExpandingMean()]},
lag_transforms=['dayofweek'],
date_features
)
fcst_no_targ_tfms.fit(
partitioned_series,=['static_0', 'static_1'],
static_features
)= fcst_no_targ_tfms.to_local()
local_fcst assert local_fcst.ts.target_transforms is None
交叉验证
= fcst.cross_validation(
cv_res
partitioned_series,=3,
n_windows=14,
h )
cv_res.compute().head()
unique_id | ds | DaskXGBForecast | DaskLGBMForecast | cutoff | y | |
---|---|---|---|---|---|---|
17 | id_01 | 2002-08-19 00:00:00 | 224.458336 | 222.742605 | 2002-08-15 00:00:00 | 210.723139 |
43 | id_03 | 2002-08-17 00:00:00 | 2.235601 | 2.210624 | 2002-08-15 00:00:00 | 2.416967 |
44 | id_03 | 2002-08-18 00:00:00 | 3.276747 | 3.239702 | 2002-08-15 00:00:00 | 3.060194 |
119 | id_08 | 2002-08-23 00:00:00 | 131.261689 | 131.180289 | 2002-08-15 00:00:00 | 138.668463 |
131 | id_09 | 2002-08-21 00:00:00 | 27.716417 | 28.263963 | 2002-08-15 00:00:00 | 22.88374 |
from mlforecast.distributed.forecast import WindowInfo
# 输入大小
= 100
input_size = fcst._preprocess(
reduced_train
partitioned_series,='unique_id',
id_col='ds',
time_col='y',
target_col=False,
dropna=WindowInfo(
window_info=1,
n_windows=10,
window_size=None,
step_size=0,
i_window=input_size,
input_size
),
)assert reduced_train.groupby('unique_id').size().compute().max() == input_size
= fcst.cross_validation(
cv_res_no_refit
partitioned_series,=3,
n_windows=14,
h=False
refit
)= cv_res.compute().sort_values(['unique_id', 'ds'])
cv_results_df = cv_res_no_refit.compute().sort_values(['unique_id', 'ds'])
cv_results_no_refit_df # 测试我们恢复相同 "metadata"
= ['DaskXGBForecast', 'DaskLGBMForecast']
models
test_eq(=models),
cv_results_no_refit_df.drop(columns=models)
cv_results_df.drop(columns )
= partitioned_series.copy()
non_std_series = non_std_series.rename(columns={'ds': 'time', 'y': 'value', 'unique_id': 'some_id'})
non_std_series = dict(
flow_params =[DaskXGBForecast(random_state=0)],
models=[Differences([7])],
target_transforms=[7],
lags={
lag_transforms1: [ExpandingMean()],
7: [RollingMean(window_size=14)]
},=1,
num_threads
)= DistributedMLForecast(freq='D', **flow_params)
fcst
fcst.fit(partitioned_series)= fcst.predict(7).compute()
preds = DistributedMLForecast(freq='D', **flow_params)
fcst2 ='some_id', time_col='time', target_col='value')
fcst2.preprocess(non_std_series, id_col= fcst.models_ # 分布式训练最终可能会得到不同的拟合结果。
fcst2.models_ = fcst2.predict(7).compute()
non_std_preds
pd.testing.assert_frame_equal(='ds'),
preds.drop(columns='time').rename(columns={'some_id': 'unique_id'})
non_std_preds.drop(columns )
client.close()
火花
会话设置
from pyspark.sql import SparkSession
= (
spark
SparkSession
.builder"spark.jars.packages", "com.microsoft.azure:synapseml_2.12:0.10.2")
.config("spark.jars.repositories", "https://mmlspark.azureedge.net/maven")
.config(
.getOrCreate() )
数据设置
对于Spark,数据必须是一个 pyspark DataFrame
。你需要确保每个时间序列只在一个分区中(例如,可以使用 repartitionByRange
来实现),并且建议分区的数量与工作节点的数量相等。如果分区数量超过工作节点,请确保设置 num_threads=1
以避免嵌套并行。
所需的输入格式与 MLForecast
相同,即应该至少包含一个ID列、一个时间列和一个目标列。
= 4
numPartitions = generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False)
series = spark.createDataFrame(series).repartitionByRange(numPartitions, 'unique_id') spark_series
模型
为了执行分布式预测,我们需要使用一个能够通过spark
以分布式方式进行训练的模型。目前的实现是SparkLGBMForecast
和SparkXGBForecast
,它们只是对原生实现的包装。
from mlforecast.distributed.models.spark.lgb import SparkLGBMForecast
from mlforecast.distributed.models.spark.xgb import SparkXGBForecast
= [SparkLGBMForecast(seed=0), SparkXGBForecast(random_state=0)] models
训练
= DistributedMLForecast(
fcst
models,='D',
freq=[Differences([7])],
target_transforms=[1],
lags={
lag_transforms1: [ExpandingMean(), ExponentiallyWeightedMean(alpha=0.9)],
},=['dayofweek'],
date_features
)
fcst.fit(
spark_series,=['static_0', 'static_1'],
static_features )
test_partition_results_size(fcst, numPartitions)
# 测试num_partitions功能是否正常
= spark.createDataFrame(series)
test_spark_df = 10
num_partitions_test = DistributedMLForecast(
fcst_np =models,
models='D',
freq=[7],
lags={
lag_transforms1: [ExpandingMean()],
7: [RollingMean(window_size=14)]
},=['dayofweek', 'month'],
date_features=1,
num_threads=num_partitions_test,
num_partitions
)
fcst_np.fit(test_spark_df)
test_partition_results_size(fcst_np, num_partitions_test)= fcst_np.predict(7).toPandas().sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds_np = fcst.predict(7).toPandas().sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds
pd.testing.assert_frame_equal('unique_id', 'ds']],
preds[['unique_id', 'ds']],
preds_np[[ )
预测
= fcst.predict(14).toPandas() preds
preds.head()
unique_id | ds | SparkLGBMForecast | SparkXGBForecast | |
---|---|---|---|---|
0 | id_00 | 2001-05-15 | 430.964632 | 431.202969 |
1 | id_00 | 2001-05-16 | 505.411960 | 504.030227 |
2 | id_00 | 2001-05-17 | 9.889056 | 9.706636 |
3 | id_00 | 2001-05-18 | 99.359694 | 96.258271 |
4 | id_00 | 2001-05-19 | 196.307731 | 197.443618 |
保存和加载
一旦您训练了模型,可以使用 DistributedMLForecast.save
方法保存推理所需的工件。请记住,如果您使用的是远程集群,您应该将 S3 等远程存储设置为目标。
mlforecast 使用 fsspec 来处理不同的文件系统,因此如果您例如使用 S3,您还需要安装 s3fs。如果您使用 pip,您只需包含 aws 附加项,例如 pip install 'mlforecast[aws,spark]'
,这将安装执行分布式训练和保存到 S3 所需的依赖项。如果您使用 conda,您必须手动安装这些依赖项(conda install fsspec fugue pyspark s3fs
)。
= build_unique_name('spark')
save_dir = f's3://nixtla-tmp/mlf/{save_dir}'
save_path try:
's3://nixtla-tmp/')
s3fs.S3FileSystem().ls(
fcst.save(save_path)except Exception as e:
print(e)
= f'{tmpdir.name}/{save_dir}'
save_path fcst.save(save_path)
一旦您保存了预测对象,您可以通过指定保存路径以及将用于执行分布式计算的引擎(在本例中是Spark会话)来重新加载它。
= DistributedMLForecast.load(save_path, engine=spark) fcst2
我们可以验证这个对象产生相同的结果。
= fa.as_pandas(fcst.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds = fa.as_pandas(fcst2.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 pd.testing.assert_frame_equal(preds, preds2)
转换为本地
存储您的分布式预测对象的另一个选项是先将其转换为本地对象,然后再保存。请注意,为了实现这一点,所有从系列中存储的远程数据都必须被拉入到单台机器上(dask中的调度器,spark中的驱动程序等),因此您必须确保数据能够适应内存,它的大小应该大约是您目标列大小的2倍(您可以通过在fit
方法中使用keep_last_n
参数进一步减少这个大小)。
= fcst.to_local()
local_fcst = local_fcst.predict(10)
local_preds # 我们不检查数据类型,因为有时这些是箭头数据类型。
# 或不同精度的浮点数
=False) pd.testing.assert_frame_equal(preds, local_preds, check_dtype
交叉验证
= fcst.cross_validation(
cv_res
spark_series,=3,
n_windows=14,
h ).toPandas()
cv_res.head()
unique_id | ds | SparkLGBMForecast | SparkXGBForecast | cutoff | y | |
---|---|---|---|---|---|---|
0 | id_15 | 2001-04-04 | 88.438691 | 86.105463 | 2001-04-02 | 92.468763 |
1 | id_25 | 2001-04-12 | 355.712493 | 354.525400 | 2001-04-02 | 320.701359 |
2 | id_03 | 2001-04-08 | 257.243845 | 253.834157 | 2001-04-02 | 274.420045 |
3 | id_14 | 2001-04-07 | 24.925278 | 23.833504 | 2001-04-02 | 26.906679 |
4 | id_01 | 2001-04-16 | 89.180665 | 90.743194 | 2001-04-02 | 93.807725 |
spark.stop()
雷
会话设置
import ray
from ray.cluster_utils import Cluster
= Cluster(
ray_cluster =True,
initialize_head={"num_cpus": 2}
head_node_args
)=ray_cluster.address, ignore_reinit_error=True)
ray.init(address# 添加模拟节点以模拟集群
= ray_cluster.add_node(num_cpus=2) mock_node
数据设置
对于ray,数据必须是一个ray DataFrame
。建议你拥有与工作线程数量相同的分区。如果你有的分区数量多于工作线程,请确保设置num_threads=1
以避免嵌套并行。
所需的输入格式与MLForecast
相同,即至少应有一个id列,一个时间列和一个目标列。
= generate_daily_series(100, n_static_features=2, equal_ends=True, static_as_categorical=False)
series # 我们需要非类别的唯一标识符。
'unique_id'] = series['unique_id'].astype(str)
series[= ray.data.from_pandas(series) ray_series
模型
Ray集成允许包含lightgbm
(RayLGBMRegressor
)和xgboost
(RayXGBRegressor
)。
from mlforecast.distributed.models.ray.lgb import RayLGBMForecast
from mlforecast.distributed.models.ray.xgb import RayXGBForecast
= [RayLGBMForecast(random_state=0), RayXGBForecast(random_state=0)] models
训练
要控制使用Ray的分区数量,我们必须在DistributedMLForecast
中包含num_partitions
。
= 4 num_partitions
= DistributedMLForecast(
fcst
models,='D',
freq=[Differences([7])],
target_transforms=[1],
lags={
lag_transforms1: [ExpandingMean(), ExponentiallyWeightedMean(alpha=0.9)],
},=['dayofweek'],
date_features=num_partitions, # 使用 num_partitions 来减少开销
num_partitions
)
fcst.fit(
ray_series,=['static_0', 'static_1'],
static_features )
test_partition_results_size(fcst, num_partitions)
# 测试num_partitions功能是否正常
# 在这种情况下,我们测试默认行为。
# 对于光线数据集,其工作方式符合预期。
= DistributedMLForecast(
fcst_np =models,
models='D',
freq=[7],
lags={
lag_transforms1: [ExpandingMean()],
7: [RollingMean(window_size=14)]
},=['dayofweek', 'month'],
date_features=1,
num_threads
)
fcst_np.fit(ray_series)# 我们不使用test_partition_results_size。
# 由于物体的数量不同
# 从分区数量来看
100) # 系列数量
test_eq(fa.count(fcst_np._partition_results), = fcst_np.predict(7).to_pandas().sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds_np = fcst.predict(7).to_pandas().sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds
pd.testing.assert_frame_equal('unique_id', 'ds']],
preds[['unique_id', 'ds']],
preds_np[[ )
预测
= fcst.predict(14).to_pandas() preds
preds.head()
unique_id | ds | RayLGBMForecast | RayXGBForecast | |
---|---|---|---|---|
0 | id_01 | 2001-05-15 | 118.505341 | 118.32222 |
1 | id_01 | 2001-05-16 | 152.321457 | 152.265915 |
2 | id_01 | 2001-05-17 | 181.979599 | 181.945618 |
3 | id_01 | 2001-05-18 | 9.530758 | 9.543224 |
4 | id_01 | 2001-05-19 | 40.503441 | 40.661186 |
保存和加载
训练完模型后,可以使用 DistributedMLForecast.save
方法保存用于推理的工件。请记住,如果您在远程集群上,则应将远程存储(如 S3)设置为目标。
mlforecast 使用 fsspec 来处理不同的文件系统,因此如果您使用 s3,您还需要安装 s3fs。如果您使用 pip,只需包含 aws 附加项,例如 pip install 'mlforecast[aws,ray]'
,这将安装使用 ray 进行分布式训练和保存到 S3 所需的依赖项。如果您使用 conda,您需要手动安装它们(conda install fsspec fugue ray s3fs
)。
= build_unique_name('ray')
save_dir = f's3://nixtla-tmp/mlf/{save_dir}'
save_path try:
's3://nixtla-tmp/')
s3fs.S3FileSystem().ls(
fcst.save(save_path)except Exception as e:
print(e)
= f'{tmpdir.name}/{save_dir}'
save_path fcst.save(save_path)
一旦你保存了你的预测对象,就可以通过指定保存路径和一个将用于执行分布式计算的引擎(在这种情况下是’ray’字符串)将其加载回来。
= DistributedMLForecast.load(save_path, engine='ray') fcst2
我们可以验证这个对象生成相同的结果。
= fa.as_pandas(fcst.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds = fa.as_pandas(fcst2.predict(10)).sort_values(['unique_id', 'ds']).reset_index(drop=True)
preds2 pd.testing.assert_frame_equal(preds, preds2)
转换为本地
将您的分布式预测对象存储的另一种选择是先将其转换为本地对象,然后保存。请记住,为了做到这一点,从系列中存储的所有远程数据必须拉回到单个机器上(dask 中的调度器、spark 中的驱动程序等),所以您必须确保其可以容纳在内存中,这将消耗大约 2 倍于目标列的大小(您可以通过在 fit
方法中使用 keep_last_n
参数进一步减少这个大小)。
= fcst.to_local()
local_fcst = local_fcst.predict(10)
local_preds # 我们不检查数据类型,因为有时这些是箭头数据类型。
# 或不同精度的浮点数
=False) pd.testing.assert_frame_equal(preds, local_preds, check_dtype
交叉验证
= fcst.cross_validation(
cv_res
ray_series,=3,
n_windows=14,
h ).to_pandas()
cv_res.head()
unique_id | ds | RayLGBMForecast | RayXGBForecast | cutoff | y | |
---|---|---|---|---|---|---|
0 | id_10 | 2001-05-01 | 24.767561 | 24.528799 | 2001-04-30 | 31.878545 |
1 | id_10 | 2001-05-07 | 1.916985 | 2.323445 | 2001-04-30 | 7.365955 |
2 | id_13 | 2001-05-01 | 210.900330 | 212.959320 | 2001-04-30 | 190.485236 |
3 | id_14 | 2001-05-01 | 196.620819 | 196.253036 | 2001-04-30 | 213.631212 |
4 | id_14 | 2001-05-03 | 323.323334 | 322.372894 | 2001-04-30 | 338.234837 |
ray.shutdown()
Give us a ⭐ on Github