管道#
Pipeline
有助于将多个估计器链接到一个统一的序列中。这非常有利,因为数据处理通常涉及一系列预定义的操作,例如特征选择、归一化和训练机器学习模型。
Feature-engine 的 Pipeline
与 scikit-learn 的 Pipeline 不同,我们的 Pipeline
支持从数据集中删除行的转换器,如 DropMissingData
、OutlierTrimmer
、LagFeatures
和 WindowFeatures
。
当从训练数据集中移除观测值时,Pipeline
会调用这些转换器中可用的 transform_x_y
方法,以调整目标变量以适应剩余的行。
在此上下文中,管道具有多种功能:
简单性和封装性:
你只需在数据上调用一次 fit
和 predict
函数,即可拟合一整套估计器序列。
超参数优化:
网格搜索和随机搜索可以同时对管道中所有估计器的超参数进行。
安全
使用管道可以防止在交叉验证期间,测试数据的统计信息泄露到训练模型中,通过确保使用相同的数据来拟合转换器和预测器。
管道函数#
在管道上调用 fit
函数,与依次对每个单独的估计器调用 fit
相同,它会转换输入数据并将其传递给后续步骤。
管道将包含最终估计器中的所有方法。例如,如果最后一个估计器是一个分类器,那么管道可以作为一个分类器来工作。同样,如果最后一个估计器是一个转换器,管道也会继承这个功能。
设置一个流水线#
通过使用一系列 (键, 值) 对来构造 Pipeline
,其中键表示步骤的期望名称,而值表示一个估计器或转换器对象。
在下面的例子中,我们设置了一个 Pipeline
,它首先删除缺失的数据,然后用序数替换类别,最后拟合一个Lasso回归模型。
import numpy as np
import pandas as pd
from feature_engine.imputation import DropMissingData
from feature_engine.encoding import OrdinalEncoder
from feature_engine.pipeline import Pipeline
from sklearn.linear_model import Lasso
X = pd.DataFrame(
dict(
x1=[2, 1, 1, 0, np.nan],
x2=["a", np.nan, "b", np.nan, "a"],
)
)
y = pd.Series([1, 2, 3, 4, 5])
pipe = Pipeline(
[
("drop", DropMissingData()),
("enc", OrdinalEncoder(encoding_method="arbitrary")),
("lasso", Lasso(random_state=10)),
]
)
# predict
pipe.fit(X, y)
preds_pipe = pipe.predict(X)
preds_pipe
在输出中,我们可以看到管道所做的预测:
array([2., 2.])
访问流水线步骤#
在 Pipeline
中,估计器存储在 steps
属性中作为一个列表。我们可以使用切片符号来获取 Pipeline 中的子集或部分 Pipeline。这一功能对于选择性地执行特定的转换或其逆变换非常有用。
例如,这种表示法提取了管道的第一个步骤:
pipe[:1]
Pipeline(steps=[('drop', DropMissingData())])
此表示法提取了管道的**前两个**步骤:
pipe[:2]
Pipeline(steps=[('drop', DropMissingData()),
('enc', OrdinalEncoder(encoding_method='arbitrary'))])
此表示法提取管道的最后一步:
pipe[-1:]
Pipeline(steps=[('lasso', Lasso(random_state=10))])
我们也可以选择管道的特定步骤来检查它们的属性。例如,我们可以如下检查Lasso算法的系数:
pipe.named_steps["lasso"].coef_
我们看到系数:
array([-0., 0.])
目标与变量之间没有关系,因此获取这些系数是可以的。
让我们检查分类变量的序数编码映射:
pipe.named_steps["enc"].encoder_dict_
我们看到整数被用来替换每个类别:
{'x2': {'a': 0, 'b': 1}}
在 Pipeline 中查找特征名称#
该 Pipeline
包含一个 get_feature_names_out()
方法,类似于其他转换器。通过使用管道切片,您可以获取进入每个步骤的特征名称。
让我们设置一个管道,为数据集添加新特征,使这更有趣:
import numpy as np
import pandas as pd
from feature_engine.imputation import DropMissingData
from feature_engine.encoding import OneHotEncoder
from feature_engine.pipeline import Pipeline
from sklearn.linear_model import Lasso
X = pd.DataFrame(
dict(
x1=[2, 1, 1, 0, np.nan],
x2=["a", np.nan, "b", np.nan, "a"],
)
)
y = pd.Series([1, 2, 3, 4, 5])
pipe = Pipeline(
[
("drop", DropMissingData()),
("enc", OneHotEncoder()),
("lasso", Lasso(random_state=10)),
]
)
pipe.fit(X, y)
在管道的第一个步骤中,没有添加任何特征,我们只是删除了包含 nan
的行。因此,如果我们执行 get_feature_names_out()
,我们应该只会看到输入数据框中的两个变量:
pipe[:1].get_feature_names_out()
['x1', 'x2']
在第二步中,我们为 x2 的每个类别添加二进制变量,因此 x2 应该消失,取而代之的是二进制变量:
pipe[:2].get_feature_names_out()
['x1', 'x2_a', 'x2_b']
最后一步是一个估计器,即一个机器学习模型。估计器不支持 get_feature_names_out()
方法。因此,如果我们对整个管道应用此方法,将会得到一个错误。
访问嵌套参数#
我们可以在管道中重新定义或重置转换器和估计器的参数。这是由网格搜索和随机搜索在幕后完成的。但如果你需要在 Pipeline
的某个步骤中更改参数,可以这样做:
pipe.set_params(lasso__alpha=10)
在这里,我们将套索回归算法的alpha值更改为10。
最佳用途:在数据预处理期间删除行#
Feature-engine 的 Pipeline
被设计用来支持那些会从数据集中移除行的转换器,例如 DropMissingData
、OutlierTrimmer
、LagFeatures
和 WindowFeatures
。
我们在本页前面看到了如何将 Pipeline
与 DropMissingData
一起使用。现在让我们看看如何将 Pipeline
与 LagFeatures
和 WindowFeaures
结合使用来进行多步预测。
我们从导入开始:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.linear_model import Lasso
from sklearn.metrics import root_mean_squared_error
from sklearn.multioutput import MultiOutputRegressor
from feature_engine.timeseries.forecasting import (
LagFeatures,
WindowFeatures,
)
from feature_engine.pipeline import Pipeline
我们将使用此处描述的澳大利亚电力需求数据集:
Godahewa, Rakshitha, Bergmeir, Christoph, Webb, Geoff, Hyndman, Rob, & Montero-Manso, Pablo. (2021). 澳大利亚电力需求数据集 (版本 1) [数据集]. Zenodo. https://doi.org/10.5281/zenodo.4659727
url = "https://raw.githubusercontent.com/tidyverts/tsibbledata/master/data-raw/vic_elec/VIC2015/demand.csv"
df = pd.read_csv(url)
df.drop(columns=["Industrial"], inplace=True)
# Convert the integer Date to an actual date with datetime type
df["date"] = df["Date"].apply(
lambda x: pd.Timestamp("1899-12-30") + pd.Timedelta(x, unit="days")
)
# Create a timestamp from the integer Period representing 30 minute intervals
df["date_time"] = df["date"] + \
pd.to_timedelta((df["Period"] - 1) * 30, unit="m")
df.dropna(inplace=True)
# Rename columns
df = df[["date_time", "OperationalLessIndustrial"]]
df.columns = ["date_time", "demand"]
# Resample to hourly
df = (
df.set_index("date_time")
.resample("h")
.agg({"demand": "sum"})
)
print(df.head())
这里,我们看到了数据的前几行:
demand
date_time
2002-01-01 00:00:00 6919.366092
2002-01-01 01:00:00 7165.974188
2002-01-01 02:00:00 6406.542994
2002-01-01 03:00:00 5815.537828
2002-01-01 04:00:00 5497.732922
我们将预测未来6小时的能源需求。我们将使用直接预测法。因此,我们需要创建6个目标变量,每个变量对应预测范围内的每一步:
horizon = 6
y = pd.DataFrame(index=df.index)
for h in range(horizon):
y[f"h_{h}"] = df.shift(periods=-h, freq="h")
y.dropna(inplace=True)
df = df.loc[y.index]
print(y.head())
这是我们的目标变量:
h_0 h_1 h_2 h_3 \
date_time
2002-01-01 00:00:00 6919.366092 7165.974188 6406.542994 5815.537828
2002-01-01 01:00:00 7165.974188 6406.542994 5815.537828 5497.732922
2002-01-01 02:00:00 6406.542994 5815.537828 5497.732922 5385.851060
2002-01-01 03:00:00 5815.537828 5497.732922 5385.851060 5574.731890
2002-01-01 04:00:00 5497.732922 5385.851060 5574.731890 5457.770634
h_4 h_5
date_time
2002-01-01 00:00:00 5497.732922 5385.851060
2002-01-01 01:00:00 5385.851060 5574.731890
2002-01-01 02:00:00 5574.731890 5457.770634
2002-01-01 03:00:00 5457.770634 5698.152000
2002-01-01 04:00:00 5698.152000 5938.337614
接下来,我们将数据分为训练集和测试集:
end_train = '2014-12-31 23:59:59'
X_train = df.loc[:end_train]
y_train = y.loc[:end_train]
begin_test = '2014-12-31 17:59:59'
X_test = df.loc[begin_test:]
y_test = y.loc[begin_test:]
接下来,我们设置 LagFeatures
和 WindowFeatures
来从滞后和窗口创建特征:
lagf = LagFeatures(
variables=["demand"],
periods=[1, 2, 3, 4, 5, 6],
missing_values="ignore",
drop_na=True,
)
winf = WindowFeatures(
variables=["demand"],
window=["3h"],
freq="1h",
functions=["mean"],
missing_values="ignore",
drop_original=True,
drop_na=True,
)
我们将套索回归封装在多输出回归器中以预测多个目标:
lasso = MultiOutputRegressor(Lasso(random_state=0, max_iter=10))
现在,我们将 Pipeline
中的步骤组装起来,并将其拟合到训练数据上:
pipe = Pipeline(
[
("lagf", lagf),
("winf", winf),
("lasso", lasso),
]
).set_output(transform="pandas")
pipe.fit(X_train, y_train)
我们可以这样获取包含预测变量和目标变量的数据集:
Xt, yt = pipe[:-1].transform_x_y(X_test, y_test)
X_test.shape, y_test.shape, Xt.shape, yt.shape
我们看到 Pipeline
在转换过程中丢弃了一些行并重新调整了目标。被丢弃的行是创建第一个滞后所必需的。
((1417, 1), (1417, 6), (1410, 7), (1410, 6))
我们可以检查预测变量的训练集,以确保我们向回归模型传递了正确的变量:
print(Xt.head())
我们看到输入特征:
demand_lag_1 demand_lag_2 demand_lag_3 demand_lag_4 \
date_time
2015-01-01 01:00:00 7804.086240 8352.992140 7571.301440 7516.472988
2015-01-01 02:00:00 7174.339984 7804.086240 8352.992140 7571.301440
2015-01-01 03:00:00 6654.283364 7174.339984 7804.086240 8352.992140
2015-01-01 04:00:00 6429.598010 6654.283364 7174.339984 7804.086240
2015-01-01 05:00:00 6412.785284 6429.598010 6654.283364 7174.339984
demand_lag_5 demand_lag_6 demand_window_3h_mean
date_time
2015-01-01 01:00:00 7801.201802 7818.461408 7804.086240
2015-01-01 02:00:00 7516.472988 7801.201802 7489.213112
2015-01-01 03:00:00 7571.301440 7516.472988 7210.903196
2015-01-01 04:00:00 8352.992140 7571.301440 6752.740453
2015-01-01 05:00:00 7804.086240 8352.992140 6498.888886
现在,我们可以为测试集进行预测:
forecast = pipe.predict(X_test)
forecasts = pd.DataFrame(
pipe.predict(X_test),
index=Xt.loc[end_train:].index,
columns=[f"step_{i+1}" for i in range(6)]
)
print(forecasts.head())
我们看到了每小时的6小时前能源需求预测:
step_1 step_2 step_3 step_4 \
date_time
2015-01-01 01:00:00 7810.769000 7890.897914 8123.247406 8374.365708
2015-01-01 02:00:00 7049.673468 7234.890108 7586.593627 7889.608312
2015-01-01 03:00:00 6723.246357 7046.660134 7429.115933 7740.984091
2015-01-01 04:00:00 6639.543752 6962.661308 7343.941881 7616.240318
2015-01-01 05:00:00 6634.279747 6949.262247 7287.866893 7633.157948
step_5 step_6
date_time
2015-01-01 01:00:00 8569.220349 8738.027713
2015-01-01 02:00:00 8116.631154 8270.579148
2015-01-01 03:00:00 7937.918837 8170.531420
2015-01-01 04:00:00 7884.815566 8197.598425
2015-01-01 05:00:00 7979.920512 8321.363714
要了解更多关于直接预测以及如何创建特征的信息,请查看我们的课程:
超参数优化#
我们可以同时优化管道中转换器和估计器的超参数。
我们将从加载泰坦尼克号数据集开始:
from feature_engine.datasets import load_titanic
from feature_engine.encoding import OneHotEncoder
from feature_engine.outliers import OutlierTrimmer
from feature_engine.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.preprocessing import StandardScaler
X, y = load_titanic(
return_X_y_frame=True,
predictors_only=True,
handle_missing=True,
)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.3, random_state=0,
)
print(X_train.head())
我们看到了训练集的前5行如下:
pclass sex age sibsp parch fare cabin embarked
501 2 female 13.000000 0 1 19.5000 Missing S
588 2 female 4.000000 1 1 23.0000 Missing S
402 2 female 30.000000 1 0 13.8583 Missing C
1193 3 male 29.881135 0 0 7.7250 Missing Q
686 3 female 22.000000 0 0 7.7250 Missing Q
现在,我们设置一个管道:
pipe = Pipeline(
[
("outliers", OutlierTrimmer(variables=["age", "fare"])),
("enc", OneHotEncoder()),
("scaler", StandardScaler()),
("logit", LogisticRegression(random_state=10)),
]
)
我们建立超参数搜索空间:
param_grid={
'logit__C': [0.1, 10.],
'enc__top_categories': [None, 5],
'outliers__capping_method': ["mad", 'iqr']
}
我们进行网格搜索:
grid = GridSearchCV(
pipe,
param_grid=param_grid,
cv=2,
refit=False,
)
grid.fit(X_train, y_train)
我们可以看到每个步骤的最佳超参数:
grid.best_params_
{'enc__top_categories': None,
'logit__C': 0.1,
'outliers__capping_method': 'iqr'}
并且使用这些超参数获得的最佳准确率:
grid.best_score_
0.7843822843822843
附加资源#
要了解更多关于特征工程和数据预处理的内容,包括缺失数据插补、异常值移除或上限处理、变量转换和编码,请查看我们的在线课程和书籍:
或者阅读我们的书:
我们的书籍和课程都适合初学者和更高级的数据科学家。通过购买它们,您正在支持 Feature-engine 的主要开发者 Sole。