块状集成方法
内容
实时笔记本
你可以在 live session 中运行此笔记本,或查看 Github 上的内容。
块状集成方法¶
Dask-ML 提供了一些针对 dask.array
和 dask.dataframe
分块结构的 集成方法。基本思想是为 dask Array 或 DataFrame 的每个块(或分区)拟合一个子估计器的副本。因为每个块都适合内存,子估计器只需要处理像 NumPy 数组或 pandas DataFrame 这样的内存数据结构。由于每个块都适合内存,并且我们不需要在集群的工作节点之间移动大量数据,因此它也会相对较快。最终我们得到一个模型集合:每个训练数据集的块对应一个模型。
在预测时,我们将集成中所有模型的结果结合起来。对于回归问题,这意味着对每个子估计器的预测进行平均。对于分类问题,每个子估计器进行投票,结果被合并。有关如何合并的详细信息,请参见 https://scikit-learn.org/stable/modules/ensemble.html#voting-classifier 。有关平均集成方法为何有用的概述,请参见 https://scikit-learn.org/stable/modules/ensemble.html 。
确保数据集中值的分布在各个分区中相对均匀至关重要。否则,在任何给定数据分区上学习的参数对于整个数据集来说都会很差。这一点将在后面详细展示。
让我们随机生成一个示例数据集。在实际操作中,您会从存储中加载数据。我们将创建一个包含10个块的 dask.array
。
[ ]:
from distributed import Client
import dask_ml.datasets
import dask_ml.ensemble
client = Client(n_workers=4, threads_per_worker=1)
X, y = dask_ml.datasets.make_classification(n_samples=1_000_000,
n_informative=10,
shift=2, scale=2,
chunks=100_000)
X
分类¶
sub-estimator
应该是一个实例化的 scikit-learn-API 兼容的估计器(任何实现了 fit
/ predict
API 的对象,包括管道)。它只需要处理内存中的数据集。我们将使用 sklearn.linear_model.RidgeClassifier
。
为了获得正确的输出形状,我们要求您在创建估计器时或在子估计器也需要类别时在 .fit
中提供分类问题的 classes
。
[ ]:
import sklearn.linear_model
subestimator = sklearn.linear_model.RidgeClassifier(random_state=0)
clf = dask_ml.ensemble.BlockwiseVotingClassifier(
subestimator,
classes=[0, 1]
)
clf
我们可以正常训练。这将 独立地 在 X
和 y
的每个分区上拟合 subestimator
的一个副本。
[ ]:
clf.fit(X, y)
所有拟合的估计器都可以在 .estimators_
中找到。
[ ]:
clf.estimators_
这些是不同的估计器!它们已经在不同的数据批次上进行了训练,并学习了不同的参数。我们可以绘制前两个模型学习到的 coef_
的差异来可视化这一点。
[ ]:
import matplotlib.pyplot as plt
import numpy as np
[ ]:
a = clf.estimators_[0].coef_
b = clf.estimators_[1].coef_
fig, ax = plt.subplots()
ax.bar(np.arange(a.shape[1]), (a - b).ravel())
ax.set(xticks=[], xlabel="Feature", title="Difference in Learned Coefficients");
也就是说,整个过程的假设是数据在分区之间的分布相对均匀。集成学习中每个成员学到的参数应该相对相似,因此在应用于相同数据时会给出相对相似的预测。
当你 predict
时,结果将具有与你要预测的输入数组相同的分块模式(这不需要与训练数据的分区匹配)。
[ ]:
preds = clf.predict(X)
preds
这将生成一组任务,
对每个子估计器(在我们的例子中是10个)调用
subestimator.predict(chunk)
将这些预测连接在一起
以某种方式将预测平均为一个总体预测
我们使用了默认的 voting="hard"
策略,这意味着我们只是选择得票数最高的类别。如果前两个子估计器为第一行选择了类别 0
,而其他八个选择了类别 1
,那么该行的最终预测将是类别 1
。
[ ]:
preds[:10].compute()
使用 voting="soft"
我们可以访问 predict_proba
,只要子估计器有 predict_proba
方法。这些子估计器应该经过良好校准,以使预测有意义。更多信息请参见 概率校准。
[ ]:
subestimator = sklearn.linear_model.LogisticRegression(random_state=0)
clf = dask_ml.ensemble.BlockwiseVotingClassifier(
subestimator,
classes=[0, 1],
voting="soft"
)
clf.fit(X, y)
[ ]:
proba = clf.predict_proba(X)
proba[:5].compute()
这里的阶段与 voting="hard"
的情况类似。现在我们不是取多数投票,而是取每个子估计器预测的概率的平均值。
回归¶
回归非常相似。主要区别在于没有投票;估计器的预测总是通过平均来减少。
[ ]:
X, y = dask_ml.datasets.make_regression(n_samples=1_000_000,
chunks=100_000,
n_features=20)
X
[ ]:
subestimator = sklearn.linear_model.LinearRegression()
clf = dask_ml.ensemble.BlockwiseVotingRegressor(
subestimator,
)
clf.fit(X, y)
[ ]:
clf.predict(X)[:5].compute()
与 Dask-ML 一样,评分是并行完成的(如果你连接到一个集群,则分布在集群上)。
[ ]:
clf.score(X, y)
非均匀分布数据的风险¶
最后,必须再次强调,在使用这些集成方法之前,您的数据应在分区之间均匀分布。如果不是,那么您最好从每个分区中抽取行,并将其拟合到一个单一的分类器中。我们所说的“均匀”并不是指“来自均匀概率分布”。只是数据分布不应该有明显的分区模式。
让我们通过一个例子来演示这一点。我们将生成一个在分区之间具有明显趋势的数据集。这可能代表一些非平稳的时间序列,尽管它也可能出现在其他上下文中(例如按地理、年龄等分区的数据)。
[ ]:
import dask.array as da
import dask.delayed
import sklearn.datasets
[ ]:
def clone_and_shift(X, y, i):
X = X.copy()
X += i + np.random.random(X.shape)
y += 25 * (i + np.random.random(y.shape))
return X, y
[ ]:
# Make a base dataset that we'll clone and shift
X, y = sklearn.datasets.make_regression(n_features=4, bias=2, random_state=0)
# Clone and shift 10 times, gradually increasing X and y for each partition
Xs, ys = zip(*[dask.delayed(clone_and_shift, nout=2)(X, y, i) for i in range(10)])
Xs = [da.from_delayed(x, shape=X.shape, dtype=X.dtype) for x in Xs]
ys = [da.from_delayed(y_, shape=y.shape, dtype=y.dtype) for y_ in ys]
X2 = da.concatenate(Xs)
y2 = da.concatenate(ys)
让我们绘制一些点的样本,根据数据来自哪个分区进行着色。
[ ]:
fig, ax = plt.subplots()
ax.scatter(X2[::5, 0], y2[::5], c=np.arange(0, len(X2), 5) // 100, cmap="Set1",
label="Partition")
ax.set(xlabel="Feature 0", ylabel="target", title="Non-stationary data (by partition)");
现在让我们拟合两个估计器:
在整个数据集上使用一个
BlockwiseVotingRegressor``(它在每个分区上拟合一个 ``LinearRegression
)在整个数据集的一个样本上进行
LinearRegression
[ ]:
subestimator = sklearn.linear_model.LinearRegression()
clf = dask_ml.ensemble.BlockwiseVotingRegressor(
subestimator,
)
clf.fit(X2, y2)
[ ]:
X_sampled, y_sampled = dask.compute(X2[::10], y2[::10])
subestimator.fit(X_sampled, y_sampled)
比较分数后,我们发现尽管在较少的数据上进行训练,采样的数据集表现要好得多。
[ ]:
clf.score(X2, y2)
[ ]:
subestimator.score(X2, y2)
这表明确保您的需求在分区之间相对均匀。即使包括标准控制来规范化生成非平稳数据的基础力量(例如,时间趋势分量或差分时序数据,地理区域的虚拟变量等),当您的数据集按非均匀变量分区时,这些控制也不足以解决问题。您仍然需要在拟合之前打乱数据,或者仅对适合内存的子样本进行采样并拟合子估计器。