使用 Darts 进行时间序列预测的迁移学习¶
作者: Julien Herzen, Florian Ravasi, Guillaume Raille, Gaël Grosch.
概述¶
本笔记本的目标是探索时间序列预测的迁移学习——即在一个时间序列数据集上训练预测模型,并在另一个数据集上使用它。该笔记本是100%自包含的——也就是说,它还包含了安装依赖项和下载所使用数据集的必要命令。
根据“学习任务”的构成,我们在这里所说的迁移学习也可以从元学习(或“学会学习”)的角度来看,模型可以在推理时适应新任务(例如预测新时间序列),而无需进一步训练 [1]。
本笔记本是2022年3月在瑞士洛桑举行的应用机器学习日会议上关于“预测与元学习”研讨会的改编版。它包含以下部分:
第0部分: 初始设置 - 导入、下载数据的函数等。
第一部分: 预测300家航空公司的乘客数量序列(
air
数据集)。我们将为每个序列训练一个模型。第二部分: 使用“全局”模型 - 即,在所有300个系列上同时训练的模型。
第三部分: 我们将尝试一些迁移学习,看看如果我们在一个(大型)数据集(
m4
数据集)上训练一些全局模型,然后在另一个数据集上使用它们会发生什么。第四部分: 我们将在另一个新数据集(
m3
数据集)上重用第三部分的预训练模型,并看看它与专门针对此数据集训练的模型相比如何。
为不同模型编写的计算时长是通过在 Apple Silicon M2 CPU 上运行笔记本,使用 Python 3.10.13 和 Darts 0.27.0 获得的。
第0部分:设置¶
首先,我们需要有正确的库并进行正确的导入。对于深度学习模型,拥有GPU是有帮助的,但这不是强制性的。
以下两个单元格只需要运行一次。它们安装依赖项并下载所有必需的数据集。
[ ]:
!pip install xlrd
!pip install darts
[2]:
# Execute this cell once to download all three datasets
!curl -L https://forecasters.org/data/m3comp/M3C.xls -o m3_dataset.xls
!curl -L https://data.transportation.gov/api/views/xgub-n9bw/rows.csv -o carrier_passengers.csv
!curl -L https://raw.githubusercontent.com/Mcompetitions/M4-methods/master/Dataset/Train/Monthly-train.csv -o m4_monthly.csv
!curl -L https://raw.githubusercontent.com/Mcompetitions/M4-methods/master/Dataset/M4-info.csv -o m4_metadata.csv
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1716k 100 1716k 0 0 6899k 0 --:--:-- --:--:-- --:--:-- 6949k
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 56.3M 0 56.3M 0 0 1747k 0 --:--:-- 0:00:33 --:--:-- 2142k
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 87.4M 100 87.4M 0 0 26.2M 0 0:00:03 0:00:03 --:--:-- 26.3M
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 4233k 100 4233k 0 0 5150k 0 --:--:-- --:--:-- --:--:-- 5163k 0 0 --:--:-- --:--:-- --:--:-- 0
现在我们导入所有内容。别担心,我们会在笔记本中逐步解释这些导入的含义 :)
[3]:
%matplotlib inline
import warnings
warnings.filterwarnings("ignore")
import os
import time
import random
import pandas as pd
import pickle
import numpy as np
from tqdm.auto import tqdm
from datetime import datetime
from itertools import product
import torch
from torch import nn
from typing import List, Tuple, Dict, Optional
from sklearn.preprocessing import MaxAbsScaler
from sklearn.linear_model import Ridge
import matplotlib.pyplot as plt
from darts import TimeSeries
from darts.utils.losses import SmapeLoss
from darts.dataprocessing.transformers import Scaler
from darts.metrics import smape
from darts.utils.utils import SeasonalityMode, TrendMode, ModelMode
from darts.models import *
我们在这里定义预测范围 - 对于本笔记本中使用的所有(月度)时间序列,我们将对提前18个月的预测感兴趣。我们选择18个月,因为这是M3/M4竞赛中用于月度系列的标准。
[4]:
HORIZON = 18
数据集加载方法¶
在这里,我们定义了一些辅助方法来加载我们将要使用的三个数据集:air
、m3
和 m4
。
以下所有方法都返回两个 TimeSeries
列表:一个训练序列列表和一个“测试”序列列表(长度为 HORIZON
)。
为了方便,所有的序列已经在这里通过乘以一个常数进行了缩放,使得最大值为1。这种缩放对于许多模型正确工作是必要的(尤其是深度学习模型)。它不影响sMAPE值,因此我们可以在缩放后的序列上评估算法的准确性。在实际应用中,我们将不得不在某个地方保留Darts的``Scaler``对象,以便对预测结果进行反向缩放。
如果你对如何创建和扩展 TimeSeries
的示例感兴趣,你可以检查 load_m3()
函数。
[5]:
def load_m3() -> Tuple[List[TimeSeries], List[TimeSeries]]:
print("building M3 TimeSeries...")
# Read DataFrame
df_m3 = pd.read_excel("m3_dataset.xls", "M3Month")
# Build TimeSeries
m3_series = []
for row in tqdm(df_m3.iterrows()):
s = row[1]
start_year = int(s["Starting Year"])
start_month = int(s["Starting Month"])
values_series = s[6:].dropna()
if start_month == 0:
continue
start_date = datetime(year=start_year, month=start_month, day=1)
time_axis = pd.date_range(start_date, periods=len(values_series), freq="M")
series = TimeSeries.from_times_and_values(
time_axis, values_series.values
).astype(np.float32)
m3_series.append(series)
print("\nThere are {} monthly series in the M3 dataset".format(len(m3_series)))
# Split train/test
print("splitting train/test...")
m3_train = [s[:-HORIZON] for s in m3_series]
m3_test = [s[-HORIZON:] for s in m3_series]
# Scale so that the largest value is 1
print("scaling...")
scaler_m3 = Scaler(scaler=MaxAbsScaler())
m3_train_scaled: List[TimeSeries] = scaler_m3.fit_transform(m3_train)
m3_test_scaled: List[TimeSeries] = scaler_m3.transform(m3_test)
print(
"done. There are {} series, with average training length {}".format(
len(m3_train_scaled), np.mean([len(s) for s in m3_train_scaled])
)
)
return m3_train_scaled, m3_test_scaled
def load_air() -> Tuple[List[TimeSeries], List[TimeSeries]]:
# download csv file
df = pd.read_csv("carrier_passengers.csv")
# extract relevant columns
df = df[["data_dte", "carrier", "Total"]]
# aggregate per carrier and date
df = pd.DataFrame(df.groupby(["carrier", "data_dte"]).sum())
# move indexes to columns
df = df.reset_index()
# group bt carrier, specificy time index and target variable
all_air_series = TimeSeries.from_group_dataframe(
df, group_cols="carrier", time_col="data_dte", value_cols="Total", freq="MS"
)
# Split train/test
print("splitting train/test...")
air_train = []
air_test = []
for series in all_air_series:
# remove the end of the series
series = series[: pd.Timestamp("2019-12-31")]
# convert to proper type
series = series.astype(np.float32)
# extract longest contiguous slice
try:
series = series.longest_contiguous_slice()
except:
continue
# remove static covariates
series = series.with_static_covariates(None)
# remove short series
if len(series) >= 36 + HORIZON:
air_train.append(series[:-HORIZON])
air_test.append(series[-HORIZON:])
# Scale so that the largest value is 1
print("scaling series...")
scaler_air = Scaler(scaler=MaxAbsScaler())
air_train_scaled: List[TimeSeries] = scaler_air.fit_transform(air_train)
air_test_scaled: List[TimeSeries] = scaler_air.transform(air_test)
print(
"done. There are {} series, with average training length {}".format(
len(air_train_scaled), np.mean([len(s) for s in air_train_scaled])
)
)
return air_train_scaled, air_test_scaled
def load_m4(
max_number_series: Optional[int] = None,
) -> Tuple[List[TimeSeries], List[TimeSeries]]:
"""
Due to the size of the dataset, this function takes approximately 10 minutes.
Use the `max_number_series` parameter to reduce the computation time if necessary
"""
# Read data dataFrame
df_m4 = pd.read_csv("m4_monthly.csv")
if max_number_series is not None:
df_m4 = df_m4[:max_number_series]
# Read metadata dataframe
df_meta = pd.read_csv("m4_metadata.csv")
df_meta = df_meta.loc[df_meta.SP == "Monthly"]
# Build TimeSeries
m4_train = []
m4_test = []
for row in tqdm(df_m4.iterrows(), total=len(df_m4)):
s = row[1]
values_series = s[1:].dropna()
start_date = pd.Timestamp(
df_meta.loc[df_meta["M4id"] == "M1", "StartingDate"].values[0]
)
time_axis = pd.date_range(start_date, periods=len(values_series), freq="M")
series = TimeSeries.from_times_and_values(
time_axis, values_series.values
).astype(np.float32)
# remove series with less than 48 training samples
if len(series) > 48 + HORIZON:
# Split train/test
m4_train.append(series[:-HORIZON])
m4_test.append(series[-HORIZON:])
print("\nThere are {} monthly series in the M3 dataset".format(len(m4_train)))
# Scale so that the largest value is 1
print("scaling...")
scaler_m4 = Scaler(scaler=MaxAbsScaler())
m4_train_scaled: List[TimeSeries] = scaler_m4.fit_transform(m4_train)
m4_test_scaled: List[TimeSeries] = scaler_m4.transform(m4_test)
print(
"done. There are {} series, with average training length {}".format(
len(m4_train_scaled), np.mean([len(s) for s in m4_train_scaled])
)
)
return m4_train_scaled, m4_test_scaled
最后,我们定义了一个方便的函数来告诉我们一组预测序列的好坏:
[6]:
def eval_forecasts(
pred_series: List[TimeSeries], test_series: List[TimeSeries]
) -> List[float]:
print("computing sMAPEs...")
smapes = smape(test_series, pred_series)
plt.figure()
plt.hist(smapes, bins=50)
plt.ylabel("Count")
plt.xlabel("sMAPE")
plt.title("Median sMAPE: %.3f" % np.median(smapes))
plt.show()
plt.close()
return smapes
第1部分:air
数据集上的本地模型¶
检查数据¶
air
数据集包含从2000年到2019年期间,进出美国的每位承运人(或航空公司)的乘客数量。
首先,我们可以通过调用我们上面定义的 load_air()
函数来加载训练和测试序列。
[7]:
air_train, air_test = load_air()
IndexError: The type of your index was not matched.
splitting train/test...
IndexError: The type of your index was not matched.
IndexError: The type of your index was not matched.
IndexError: The type of your index was not matched.
IndexError: The type of your index was not matched.
IndexError: The type of your index was not matched.
IndexError: The type of your index was not matched.
IndexError: The type of your index was not matched.
IndexError: The type of your index was not matched.
IndexError: The type of your index was not matched.
IndexError: The type of your index was not matched.
IndexError: The type of your index was not matched.
IndexError: The type of your index was not matched.
IndexError: The type of your index was not matched.
scaling series...
done. There are 245 series, with average training length 154.06938775510204
首先通过可视化一些序列来了解它们的外观是一个好主意。我们可以通过调用 series.plot()
来绘制一个序列。
[8]:
figure, ax = plt.subplots(3, 2, figsize=(10, 10), dpi=100)
for i, idx in enumerate([1, 20, 50, 100, 150, 200]):
axis = ax[i % 3, i % 2]
air_train[idx].plot(ax=axis)
axis.legend(air_train[idx].components)
axis.set_title("")
plt.tight_layout()
我们可以看到,大多数系列看起来非常不同,它们甚至有不同的时间轴!例如,一些系列从2001年1月开始,而其他系列从2010年4月开始。
让我们看看最短的列车系列是什么:
[9]:
min([len(s) for s in air_train])
[9]:
36
一个评估模型的有用函数¶
下面,我们编写一个小函数,以便于快速尝试和比较不同的本地模型。我们遍历每个序列,拟合一个模型,然后在我们的测试数据集上进行评估。
⚠️
tqdm
是可选的,它仅用于帮助显示训练进度(正如您将看到的,当训练300+个时间序列时,可能需要一些时间)
[10]:
def eval_local_model(
train_series: List[TimeSeries], test_series: List[TimeSeries], model_cls, **kwargs
) -> Tuple[List[float], float]:
preds = []
start_time = time.time()
for series in tqdm(train_series):
model = model_cls(**kwargs)
model.fit(series)
pred = model.predict(n=HORIZON)
preds.append(pred)
elapsed_time = time.time() - start_time
smapes = eval_forecasts(preds, test_series)
return smapes, elapsed_time
构建和评估模型¶
我们现在可以在这个数据集上尝试第一个预测模型。作为第一步,通常一个好的做法是看看一个(非常)简单的模型盲目重复训练序列的最后一个值的表现如何。这可以在 Darts 中使用 NaiveSeasonal 模型来完成:
[11]:
naive1_smapes, naive1_time = eval_local_model(air_train, air_test, NaiveSeasonal, K=1)
computing sMAPEs...
因此,最朴素的模型给出的中位数sMAPE约为27.2。
我们能否通过一个“不那么简单”的模型来做得更好,利用大多数月度序列具有12个月的季节性这一事实?
[12]:
naive12_smapes, naive12_time = eval_local_model(
air_train, air_test, NaiveSeasonal, K=12
)
computing sMAPEs...
这更好。让我们尝试 ExponentialSmoothing(默认情况下,对于月度系列,它将使用 12 的季节性):
[13]:
ets_smapes, ets_time = eval_local_model(air_train, air_test, ExponentialSmoothing)
computing sMAPEs...
中位数略优于朴素的季节性模型。我们还可以快速尝试另一种模型,即 Theta
方法,该方法赢得了 M3 竞赛:
[14]:
theta_smapes, theta_time = eval_local_model(air_train, air_test, Theta, theta=1.5)
computing sMAPEs...
那ARIMA呢?
[15]:
warnings.filterwarnings("ignore") # ARIMA generates lots of warnings
arima_smapes, arima_time = eval_local_model(air_train, air_test, ARIMA, p=12, d=1, q=1)
computing sMAPEs...
还是卡尔曼滤波器?(在Darts中,拟合卡尔曼滤波器使用N4SID系统识别算法)
[16]:
kf_smapes, kf_time = eval_local_model(air_train, air_test, KalmanForecaster, dim_x=12)
computing sMAPEs...
比较模型¶
下面,我们定义一个函数,该函数将有助于可视化模型在中位数sMAPE和获取预测所需时间方面的相互比较。
[17]:
def plot_models(method_to_elapsed_times, method_to_smapes):
shapes = ["o", "s", "*"]
colors = ["tab:blue", "tab:orange", "tab:green", "tab:red", "tab:purple"]
styles = list(product(shapes, colors))
plt.figure(figsize=(6, 6), dpi=100)
for i, method in enumerate(method_to_elapsed_times.keys()):
t = method_to_elapsed_times[method]
s = styles[i]
plt.semilogx(
[t],
[np.median(method_to_smapes[method])],
s[0],
color=s[1],
label=method,
markersize=13,
)
plt.xlabel("elapsed time [s]")
plt.ylabel("median sMAPE over all series")
plt.legend(bbox_to_anchor=(1.4, 1.0), frameon=True);
[18]:
smapes = {
"naive-last": naive1_smapes,
"naive-seasonal": naive12_smapes,
"Exponential Smoothing": ets_smapes,
"Theta": theta_smapes,
"ARIMA": arima_smapes,
"Kalman Filter": kf_smapes,
}
elapsed_times = {
"naive-last": naive1_time,
"naive-seasonal": naive12_time,
"Exponential Smoothing": ets_time,
"Theta": theta_time,
"ARIMA": arima_time,
"Kalman Filter": kf_time,
}
plot_models(elapsed_times, smapes)
到目前为止的结论¶
ARIMA 给出了最好的结果,但它也是(到目前为止)最耗时的模型。指数平滑模型提供了一个有趣的权衡,具有良好的预测精度和较低的计算成本。我们是否可以通过考虑 全局 模型来找到更好的折衷方案——即,只在所有时间序列上联合训练一次的模型?
第二部分:air
数据集上的全局模型¶
在本节中,我们将使用“全局模型”——即,在多个序列上同时拟合的模型。Darts 本质上包含两种全局模型:
RegressionModels
是围绕类似 sklearn 回归模型的包装器(第 2.1 部分)。基于PyTorch的模型,提供各种深度学习模型(第2.2部分)。
两种模型都可以通过“表格化”数据来在多个序列上进行训练——即,从所有训练序列中提取许多(输入,输出)子切片,并以监督的方式训练机器学习模型,根据输入预测输出。
我们首先定义一个函数 eval_global_model()
,它与 eval_local_model()
类似,但在全局模型上工作。
[19]:
def eval_global_model(
train_series: List[TimeSeries], test_series: List[TimeSeries], model_cls, **kwargs
) -> Tuple[List[float], float]:
start_time = time.time()
model = model_cls(**kwargs)
model.fit(train_series)
preds = model.predict(n=HORIZON, series=train_series)
elapsed_time = time.time() - start_time
smapes = eval_forecasts(preds, test_series)
return smapes, elapsed_time
第2.1部分:使用Darts的``RegressionModel``。¶
RegressionModel
在 Darts 中是预测模型,可以包裹任何“scikit-learn 兼容”的回归模型以获得预测。与深度学习相比,它们代表了良好的“首选”全局模型,因为它们通常没有很多超参数,并且训练速度更快。此外,Darts 还提供了一些“预包装”的回归模型,如 LinearRegressionModel
和 LightGBMModel
。
我们现在将使用我们的函数 eval_global_models()
。在接下来的单元格中,我们将尝试使用一些回归模型,例如:
LinearRegressionModel
LightGBMModel
RegressionModel(some_sklearn_model)
你可以参考 API 文档 了解如何使用它们。
重要的参数是 lags
和 output_chunk_length
。它们分别决定了模型使用的回溯和“前瞻”窗口的长度,并且它们对应于用于训练的输入/输出子切片的长度。例如,lags=24
和 output_chunk_length=12
意味着模型将消耗过去24个滞后值以预测接下来的12个值。在我们的例子中,因为最短的训练序列长度为36,所以我们必须有 lags + output_chunk_length <= 36
。(注意,lags
也可以是一个整数列表,表示模型要消耗的各个滞后值,而不是窗口长度)。
[20]:
lr_smapes, lr_time = eval_global_model(
air_train, air_test, LinearRegressionModel, lags=30, output_chunk_length=1
)
computing sMAPEs...
[21]:
lgbm_smapes, lgbm_time = eval_global_model(
air_train, air_test, LightGBMModel, lags=30, output_chunk_length=1, objective="mape"
)
[LightGBM] [Warning] Some label values are < 1 in absolute value. MAPE is unstable with such values, so LightGBM rounds them to 1.0 when calculating MAPE.
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.003476 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 7649
[LightGBM] [Info] Number of data points in the train set: 30397, number of used features: 30
[LightGBM] [Info] Start training from score 0.481005
computing sMAPEs...
[22]:
rf_smapes, rf_time = eval_global_model(
air_train, air_test, RandomForest, lags=30, output_chunk_length=1
)
computing sMAPEs...
第2.2部分:使用深度学习¶
下面,我们将在我们的 air
数据集上训练一个 N-BEATS 模型。同样,你可以参考 API 文档 获取关于超参数的文档。以下超参数应该是一个不错的起点,如果你使用的是(有点慢的)Colab GPU,训练时间大约在一两分钟左右。
在训练期间,你可以查看 N-BEATS 论文。
[23]:
### Possible N-BEATS hyper-parameters
# Slicing hyper-params:
IN_LEN = 30
OUT_LEN = 4
# Architecture hyper-params:
NUM_STACKS = 20
NUM_BLOCKS = 1
NUM_LAYERS = 2
LAYER_WIDTH = 136
COEFFS_DIM = 11
# Training settings:
LR = 1e-3
BATCH_SIZE = 1024
MAX_SAMPLES_PER_TS = 10
NUM_EPOCHS = 10
现在让我们使用 N-BEATS 模型进行构建、训练和预测:
[24]:
# reproducibility
np.random.seed(42)
torch.manual_seed(42)
start_time = time.time()
nbeats_model_air = NBEATSModel(
input_chunk_length=IN_LEN,
output_chunk_length=OUT_LEN,
num_stacks=NUM_STACKS,
num_blocks=NUM_BLOCKS,
num_layers=NUM_LAYERS,
layer_widths=LAYER_WIDTH,
expansion_coefficient_dim=COEFFS_DIM,
loss_fn=SmapeLoss(),
batch_size=BATCH_SIZE,
optimizer_kwargs={"lr": LR},
pl_trainer_kwargs={
"enable_progress_bar": True,
# change this one to "gpu" if your notebook does run in a GPU environment:
"accelerator": "cpu",
},
)
nbeats_model_air.fit(air_train, num_loader_workers=4, epochs=NUM_EPOCHS)
# get predictions
nb_preds = nbeats_model_air.predict(series=air_train, n=HORIZON)
nbeats_elapsed_time = time.time() - start_time
nbeats_smapes = eval_forecasts(nb_preds, air_test)
GPU available: True (mps), used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
| Name | Type | Params
---------------------------------------------------
0 | criterion | SmapeLoss | 0
1 | train_metrics | MetricCollection | 0
2 | val_metrics | MetricCollection | 0
3 | stacks | ModuleList | 525 K
---------------------------------------------------
523 K Trainable params
1.9 K Non-trainable params
525 K Total params
2.102 Total estimated model params size (MB)
`Trainer.fit` stopped: `max_epochs=10` reached.
GPU available: True (mps), used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
computing sMAPEs...
现在让我们再次看看我们的错误与时间图:
[25]:
smapes_2 = {
**smapes,
**{
"Linear Regression": lr_smapes,
"LGBM": lgbm_smapes,
"Random Forest": rf_smapes,
"NBeats": nbeats_smapes,
},
}
elapsed_times_2 = {
**elapsed_times,
**{
"Linear Regression": lr_time,
"LGBM": lgbm_time,
"Random Forest": rf_time,
"NBeats": nbeats_elapsed_time,
},
}
plot_models(elapsed_times_2, smapes_2)
到目前为止的结论¶
所以看起来,在所有序列上联合训练的线性回归模型现在在准确性和速度之间提供了最佳的权衡。线性回归通常是首选方法!
我们的深度学习模型 N-BEATS 表现不佳。需要注意的是,我们还没有尝试专门调整它以解决这个问题,这样做可能会产生更准确的结果。不过,与其花时间调整它,在下一节中我们将看看它是否可以通过在完全不同的数据集上训练来做得更好。
第三部分:在 m4
数据集上训练 N-BEATS 模型,并使用它来预测 air
数据集¶
深度学习模型在训练于 大 数据集时通常表现更好。让我们尝试加载M4数据集中的所有48,000个月度时间序列,并在更大的数据集上再次训练我们的模型。
[26]:
m4_train, m4_test = load_m4()
There are 47087 monthly series in the M3 dataset
scaling...
done. There are 47087 series, with average training length 201.25202285131778
我们可以从与之前相同的超参数开始。
在48,000个M4训练系列中,平均每个系列大约有200个时间步长,我们最终会得到大约1000万个训练样本。由于训练样本数量如此之多,每个epoch将花费太长时间。因此,在这里,我们将限制每个系列使用的训练样本数量。这是通过在调用``fit()``时使用参数``max_samples_per_ts``来完成的。我们添加了一个新的超参数``MAX_SAMPLES_PER_TS``来捕捉这一点。
由于M4训练系列都稍微长一些,我们也可以使用稍微长一点的 input_chunk_length
。
[27]:
# Slicing hyper-params:
IN_LEN = 36
OUT_LEN = 4
# Architecture hyper-params:
NUM_STACKS = 20
NUM_BLOCKS = 1
NUM_LAYERS = 2
LAYER_WIDTH = 136
COEFFS_DIM = 11
# Training settings:
LR = 1e-3
BATCH_SIZE = 1024
MAX_SAMPLES_PER_TS = (
10 # <-- new parameter, limiting the number of training samples per series
)
NUM_EPOCHS = 5
[28]:
# reproducibility
np.random.seed(42)
torch.manual_seed(42)
nbeats_model_m4 = NBEATSModel(
input_chunk_length=IN_LEN,
output_chunk_length=OUT_LEN,
batch_size=BATCH_SIZE,
num_stacks=NUM_STACKS,
num_blocks=NUM_BLOCKS,
num_layers=NUM_LAYERS,
layer_widths=LAYER_WIDTH,
expansion_coefficient_dim=COEFFS_DIM,
loss_fn=SmapeLoss(),
optimizer_kwargs={"lr": LR},
pl_trainer_kwargs={
"enable_progress_bar": True,
# change this one to "gpu" if your notebook does run in a GPU environment:
"accelerator": "cpu",
},
)
# Train
nbeats_model_m4.fit(
m4_train,
num_loader_workers=4,
epochs=NUM_EPOCHS,
max_samples_per_ts=MAX_SAMPLES_PER_TS,
)
GPU available: True (mps), used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
| Name | Type | Params
---------------------------------------------------
0 | criterion | SmapeLoss | 0
1 | train_metrics | MetricCollection | 0
2 | val_metrics | MetricCollection | 0
3 | stacks | ModuleList | 543 K
---------------------------------------------------
541 K Trainable params
1.9 K Non-trainable params
543 K Total params
2.173 Total estimated model params size (MB)
`Trainer.fit` stopped: `max_epochs=5` reached.
[28]:
NBEATSModel(generic_architecture=True, num_stacks=20, num_blocks=1, num_layers=2, layer_widths=136, expansion_coefficient_dim=11, trend_polynomial_degree=2, dropout=0.0, activation=ReLU, input_chunk_length=36, output_chunk_length=4, batch_size=1024, loss_fn=SmapeLoss(), optimizer_kwargs={'lr': 0.001}, pl_trainer_kwargs={'enable_progress_bar': True, 'accelerator': 'cpu'})
我们现在可以使用我们训练的 M4 模型来预测航空乘客系列数据。由于我们在这里以“元学习”(或迁移学习)的方式使用模型,我们将只计时推理部分。
[29]:
start_time = time.time()
preds = nbeats_model_m4.predict(series=air_train, n=HORIZON) # get forecasts
nbeats_m4_elapsed_time = time.time() - start_time
nbeats_m4_smapes = eval_forecasts(preds, air_test)
GPU available: True (mps), used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
computing sMAPEs...
[30]:
smapes_3 = {**smapes_2, **{"N-BEATS (M4-trained)": nbeats_m4_smapes}}
elapsed_times_3 = {
**elapsed_times_2,
**{"N-BEATS (M4-trained)": nbeats_m4_elapsed_time},
}
plot_models(elapsed_times_3, smapes_3)
到目前为止的结论¶
尽管在准确性方面不是绝对最好的,但我们基于 m4
训练的 N-BEATS 模型达到了竞争性的准确率。这相当引人注目,因为这个模型*没有*在我们要求它预测的 air
系列上进行过*任何*训练!使用 N-BEATS 进行预测的步骤比我们使用 ARIMA 所需的拟合预测步骤快 1000 倍以上,也比线性回归更快。
为了好玩,我们也可以手动检查这个模型在另一个系列上的表现——例如,在 darts.datasets
中可用的月度牛奶生产系列:
[31]:
from darts.datasets import MonthlyMilkDataset
series = MonthlyMilkDataset().load().astype(np.float32)
train, val = series[:-24], series[-24:]
scaler = Scaler(scaler=MaxAbsScaler())
train = scaler.fit_transform(train)
val = scaler.transform(val)
series = scaler.transform(series)
pred = nbeats_model_m4.predict(series=train, n=24)
series.plot(label="actual")
pred.plot(label="0-shot forecast")
GPU available: True (mps), used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
[31]:
<Axes: xlabel='Month'>
尝试在 m4
上训练其他全局模型并应用于航空乘客¶
现在让我们尝试在M4数据集上训练其他全局模型,以查看是否能获得类似的结果。下面,我们将在完整的 m4
数据集上训练一些 RegressionModel
。这可能会非常慢。为了加快训练速度,我们可以使用例如 random.choices(m4_train, k=5000)
来替代 m4_train
以限制训练集的大小。我们还可以为 max_samples_per_ts
指定一个足够小的值,以限制训练样本的数量。
[32]:
random.seed(42)
lr_model_m4 = LinearRegressionModel(lags=30, output_chunk_length=1)
lr_model_m4.fit(m4_train)
tic = time.time()
preds = lr_model_m4.predict(n=HORIZON, series=air_train)
lr_time_transfer = time.time() - tic
lr_smapes_transfer = eval_forecasts(preds, air_test)
computing sMAPEs...
[33]:
random.seed(42)
lgbm_model_m4 = LightGBMModel(lags=30, output_chunk_length=1, objective="mape")
lgbm_model_m4.fit(m4_train)
tic = time.time()
preds = lgbm_model_m4.predict(n=HORIZON, series=air_train)
lgbm_time_transfer = time.time() - tic
lgbm_smapes_transfer = eval_forecasts(preds, air_test)
[LightGBM] [Warning] Some label values are < 1 in absolute value. MAPE is unstable with such values, so LightGBM rounds them to 1.0 when calculating MAPE.
[LightGBM] [Info] Auto-choosing row-wise multi-threading, the overhead of testing was 0.097573 seconds.
You can set `force_row_wise=true` to remove the overhead.
And if memory is not enough, you can set `force_col_wise=true`.
[LightGBM] [Info] Total Bins 7650
[LightGBM] [Info] Number of data points in the train set: 8063744, number of used features: 30
[LightGBM] [Info] Start training from score 0.722222
computing sMAPEs...
最后,让我们也将这些新结果绘制出来:
[34]:
smapes_4 = {
**smapes_3,
**{
"Linear Reg (M4-trained)": lr_smapes_transfer,
"LGBM (M4-trained)": lgbm_smapes_transfer,
},
}
elapsed_times_4 = {
**elapsed_times_3,
**{
"Linear Reg (M4-trained)": lr_time_transfer,
"LGBM (M4-trained)": lgbm_time_transfer,
},
}
plot_models(elapsed_times_4, smapes_4)
第四部分及回顾:在M3数据集上使用相同的模型¶
好的,那么,我们在航空公司乘客数据集上是否幸运呢?让我们通过在新数据集上重复整个过程来看看 :) 你会发现它实际上只需要很少的代码行。作为新的数据集,我们将使用 m3
,它包含了大约1,400个来自M3竞赛的月度系列数据。
[35]:
m3_train, m3_test = load_m3()
building M3 TimeSeries...
There are 1399 monthly series in the M3 dataset
splitting train/test...
scaling...
done. There are 1399 series, with average training length 100.30092923516797
[36]:
naive1_smapes_m3, naive1_time_m3 = eval_local_model(
m3_train, m3_test, NaiveSeasonal, K=1
)
computing sMAPEs...
[37]:
naive12_smapes_m3, naive12_time_m3 = eval_local_model(
m3_train, m3_test, NaiveSeasonal, K=12
)
computing sMAPEs...
[38]:
ets_smapes_m3, ets_time_m3 = eval_local_model(m3_train, m3_test, ExponentialSmoothing)
computing sMAPEs...
[39]:
theta_smapes_m3, theta_time_m3 = eval_local_model(m3_train, m3_test, Theta)
computing sMAPEs...
[40]:
warnings.filterwarnings("ignore") # ARIMA generates lots of warnings
# Note: using q=1 here generates errors for some series, so we use q=0
arima_smapes_m3, arima_time_m3 = eval_local_model(
m3_train, m3_test, ARIMA, p=12, d=1, q=0
)
computing sMAPEs...
[41]:
kf_smapes_m3, kf_time_m3 = eval_local_model(
m3_train, m3_test, KalmanForecaster, dim_x=12
)
computing sMAPEs...
[42]:
lr_smapes_m3, lr_time_m3 = eval_global_model(
m3_train, m3_test, LinearRegressionModel, lags=30, output_chunk_length=1
)
computing sMAPEs...
[43]:
lgbm_smapes_m3, lgbm_time_m3 = eval_global_model(
m3_train, m3_test, LightGBMModel, lags=35, output_chunk_length=1, objective="mape"
)
[LightGBM] [Warning] Some label values are < 1 in absolute value. MAPE is unstable with such values, so LightGBM rounds them to 1.0 when calculating MAPE.
[LightGBM] [Info] Auto-choosing col-wise multi-threading, the overhead of testing was 0.004819 seconds.
You can set `force_col_wise=true` to remove the overhead.
[LightGBM] [Info] Total Bins 8925
[LightGBM] [Info] Number of data points in the train set: 91356, number of used features: 35
[LightGBM] [Info] Start training from score 0.771293
computing sMAPEs...
[44]:
# Get forecasts with our pre-trained N-BEATS
start_time = time.time()
preds = nbeats_model_m4.predict(series=m3_train, n=HORIZON) # get forecasts
nbeats_m4_elapsed_time_m3 = time.time() - start_time
nbeats_m4_smapes_m3 = eval_forecasts(preds, m3_test)
GPU available: True (mps), used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs
computing sMAPEs...
[45]:
# Get forecasts with our pre-trained linear regression model
start_time = time.time()
preds = lr_model_m4.predict(series=m3_train, n=HORIZON) # get forecasts
lr_m4_elapsed_time_m3 = time.time() - start_time
lr_m4_smapes_m3 = eval_forecasts(preds, m3_test)
computing sMAPEs...
[46]:
# Get forecasts with our pre-trained LightGBM model
start_time = time.time()
preds = lgbm_model_m4.predict(series=m3_train, n=HORIZON) # get forecasts
lgbm_m4_elapsed_time_m3 = time.time() - start_time
lgbm_m4_smapes_m3 = eval_forecasts(preds, m3_test)
computing sMAPEs...
[47]:
smapes_m3 = {
"naive-last": naive1_smapes_m3,
"naive-seasonal": naive12_smapes_m3,
"Exponential Smoothing": ets_smapes_m3,
"ARIMA": arima_smapes_m3,
"Theta": theta_smapes_m3,
"Kalman filter": kf_smapes_m3,
"Linear Regression": lr_smapes_m3,
"LGBM": lgbm_smapes_m3,
"N-BEATS (M4-trained)": nbeats_m4_smapes_m3,
"Linear Reg (M4-trained)": lr_m4_smapes_m3,
"LGBM (M4-trained)": lgbm_m4_smapes_m3,
}
times_m3 = {
"naive-last": naive1_time_m3,
"naive-seasonal": naive12_time_m3,
"Exponential Smoothing": ets_time_m3,
"ARIMA": arima_time_m3,
"Theta": theta_time_m3,
"Kalman filter": kf_time_m3,
"Linear Regression": lr_time_m3,
"LGBM": lgbm_time_m3,
"N-BEATS (M4-trained)": nbeats_m4_elapsed_time_m3,
"Linear Reg (M4-trained)": lr_m4_elapsed_time_m3,
"LGBM (M4-trained)": lgbm_m4_elapsed_time_m3,
}
plot_models(times_m3, smapes_m3)
在这里,预训练的 N-BEATS 模型也获得了合理的准确性,尽管不如最准确的模型。还要注意的是,指数平滑和卡尔曼滤波器现在的表现比我们在使用航空乘客系列时好得多。ARIMA 表现最好,但比 N-BEATS 慢大约 1000 倍,后者不需要任何训练,每个时间序列的预测时间大约为 15 毫秒。回想一下,这个 N-BEATS 模型从未在我们要求其预测的任何系列上进行过训练。
结论¶
迁移学习和元学习无疑是一个有趣的现象,目前在时间序列预测领域尚未得到充分探索。它何时成功?何时失败?微调能有所帮助吗?何时应该使用它?许多这些问题仍有待探索,但我们希望已经展示了使用 Darts 模型进行探索是相当容易的。
现在,哪种方法最适合你的情况?一如既往,这取决于。如果你主要处理的是有足够历史数据的孤立系列,经典的ARIMA等方法会让你走得很远。即使在更大的数据集上,如果计算能力不是太大的问题,它们也可以代表有趣的即用型选项。另一方面,如果你处理的是大量系列或高维系列,机器学习方法和全局模型通常是更好的选择。它们可以捕捉广泛的不同时间序列中的模式,并且通常运行速度更快。不要低估这个类别中的线性回归模型!如果你有理由相信你需要捕捉更复杂的模式,或者如果你的推理速度对你来说*真的*很重要,可以尝试深度学习方法。N-BEATS已经证明了其在元学习中的价值[1],但这也有可能与其他模型一起工作。
[1] Oreshkin 等人,“元学习框架及其在零样本时间序列预测中的应用”,2020年,https://arxiv.org/abs/2002.02887