实时笔记本

你可以在 live session Binder 中运行此笔记本,或查看 Github 上的内容。

块状集成方法

Dask-ML 提供了一些针对 dask.arraydask.dataframe 分块结构的 集成方法。基本思想是为 dask Array 或 DataFrame 的每个块(或分区)拟合一个子估计器的副本。因为每个块都适合内存,子估计器只需要处理像 NumPy 数组或 pandas DataFrame 这样的内存数据结构。由于每个块都适合内存,并且我们不需要在集群的工作节点之间移动大量数据,因此它也会相对较快。最终我们得到一个模型集合:每个训练数据集的块对应一个模型。

在预测时,我们将集成中所有模型的结果结合起来。对于回归问题,这意味着对每个子估计器的预测进行平均。对于分类问题,每个子估计器进行投票,结果被合并。有关如何合并的详细信息,请参见 https://scikit-learn.org/stable/modules/ensemble.html#voting-classifier 。有关平均集成方法为何有用的概述,请参见 https://scikit-learn.org/stable/modules/ensemble.html

确保数据集中值的分布在各个分区中相对均匀至关重要。否则,在任何给定数据分区上学习的参数对于整个数据集来说都会很差。这一点将在后面详细展示。

让我们随机生成一个示例数据集。在实际操作中,您会从存储中加载数据。我们将创建一个包含10个块的 dask.array

[ ]:
from distributed import Client
import dask_ml.datasets
import dask_ml.ensemble

client = Client(n_workers=4, threads_per_worker=1)

X, y = dask_ml.datasets.make_classification(n_samples=1_000_000,
                                            n_informative=10,
                                            shift=2, scale=2,
                                            chunks=100_000)
X

分类

sub-estimator 应该是一个实例化的 scikit-learn-API 兼容的估计器(任何实现了 fit / predict API 的对象,包括管道)。它只需要处理内存中的数据集。我们将使用 sklearn.linear_model.RidgeClassifier

为了获得正确的输出形状,我们要求您在创建估计器时或在子估计器也需要类别时在 .fit 中提供分类问题的 classes

[ ]:
import sklearn.linear_model

subestimator = sklearn.linear_model.RidgeClassifier(random_state=0)
clf = dask_ml.ensemble.BlockwiseVotingClassifier(
    subestimator,
    classes=[0, 1]
)
clf

我们可以正常训练。这将 独立地Xy 的每个分区上拟合 subestimator 的一个副本。

[ ]:
clf.fit(X, y)

所有拟合的估计器都可以在 .estimators_ 中找到。

[ ]:
clf.estimators_

这些是不同的估计器!它们已经在不同的数据批次上进行了训练,并学习了不同的参数。我们可以绘制前两个模型学习到的 coef_ 的差异来可视化这一点。

[ ]:
import matplotlib.pyplot as plt
import numpy as np
[ ]:
a = clf.estimators_[0].coef_
b = clf.estimators_[1].coef_

fig, ax = plt.subplots()
ax.bar(np.arange(a.shape[1]), (a - b).ravel())
ax.set(xticks=[], xlabel="Feature", title="Difference in Learned Coefficients");

也就是说,整个过程的假设是数据在分区之间的分布相对均匀。集成学习中每个成员学到的参数应该相对相似,因此在应用于相同数据时会给出相对相似的预测。

当你 predict 时,结果将具有与你要预测的输入数组相同的分块模式(这不需要与训练数据的分区匹配)。

[ ]:
preds = clf.predict(X)
preds

这将生成一组任务,

  1. 对每个子估计器(在我们的例子中是10个)调用 subestimator.predict(chunk)

  2. 将这些预测连接在一起

  3. 以某种方式将预测平均为一个总体预测

我们使用了默认的 voting="hard" 策略,这意味着我们只是选择得票数最高的类别。如果前两个子估计器为第一行选择了类别 0,而其他八个选择了类别 1,那么该行的最终预测将是类别 1

[ ]:
preds[:10].compute()

使用 voting="soft" 我们可以访问 predict_proba,只要子估计器有 predict_proba 方法。这些子估计器应该经过良好校准,以使预测有意义。更多信息请参见 概率校准

[ ]:
subestimator = sklearn.linear_model.LogisticRegression(random_state=0)
clf = dask_ml.ensemble.BlockwiseVotingClassifier(
    subestimator,
    classes=[0, 1],
    voting="soft"
)
clf.fit(X, y)
[ ]:
proba = clf.predict_proba(X)
proba[:5].compute()

这里的阶段与 voting="hard" 的情况类似。现在我们不是取多数投票,而是取每个子估计器预测的概率的平均值。

回归

回归非常相似。主要区别在于没有投票;估计器的预测总是通过平均来减少。

[ ]:
X, y = dask_ml.datasets.make_regression(n_samples=1_000_000,
                                        chunks=100_000,
                                        n_features=20)
X
[ ]:
subestimator = sklearn.linear_model.LinearRegression()
clf = dask_ml.ensemble.BlockwiseVotingRegressor(
    subestimator,
)
clf.fit(X, y)
[ ]:
clf.predict(X)[:5].compute()

与 Dask-ML 一样,评分是并行完成的(如果你连接到一个集群,则分布在集群上)。

[ ]:
clf.score(X, y)

非均匀分布数据的风险

最后,必须再次强调,在使用这些集成方法之前,您的数据应在分区之间均匀分布。如果不是,那么您最好从每个分区中抽取行,并将其拟合到一个单一的分类器中。我们所说的“均匀”并不是指“来自均匀概率分布”。只是数据分布不应该有明显的分区模式。

让我们通过一个例子来演示这一点。我们将生成一个在分区之间具有明显趋势的数据集。这可能代表一些非平稳的时间序列,尽管它也可能出现在其他上下文中(例如按地理、年龄等分区的数据)。

[ ]:
import dask.array as da
import dask.delayed
import sklearn.datasets
[ ]:
def clone_and_shift(X, y, i):
    X = X.copy()
    X += i + np.random.random(X.shape)
    y += 25 * (i + np.random.random(y.shape))
    return X, y
[ ]:
# Make a base dataset that we'll clone and shift
X, y = sklearn.datasets.make_regression(n_features=4, bias=2, random_state=0)

# Clone and shift 10 times, gradually increasing X and y for each partition
Xs, ys = zip(*[dask.delayed(clone_and_shift, nout=2)(X, y, i) for i in range(10)])
Xs = [da.from_delayed(x, shape=X.shape, dtype=X.dtype) for x in Xs]
ys = [da.from_delayed(y_, shape=y.shape, dtype=y.dtype) for y_ in ys]
X2 = da.concatenate(Xs)
y2 = da.concatenate(ys)

让我们绘制一些点的样本,根据数据来自哪个分区进行着色。

[ ]:
fig, ax = plt.subplots()
ax.scatter(X2[::5, 0], y2[::5], c=np.arange(0, len(X2), 5) // 100, cmap="Set1",
           label="Partition")
ax.set(xlabel="Feature 0", ylabel="target", title="Non-stationary data (by partition)");

现在让我们拟合两个估计器:

  1. 在整个数据集上使用一个 BlockwiseVotingRegressor``(它在每个分区上拟合一个 ``LinearRegression

  2. 在整个数据集的一个样本上进行 LinearRegression

[ ]:
subestimator = sklearn.linear_model.LinearRegression()
clf = dask_ml.ensemble.BlockwiseVotingRegressor(
    subestimator,
)
clf.fit(X2, y2)
[ ]:
X_sampled, y_sampled = dask.compute(X2[::10], y2[::10])

subestimator.fit(X_sampled, y_sampled)

比较分数后,我们发现尽管在较少的数据上进行训练,采样的数据集表现要好得多。

[ ]:
clf.score(X2, y2)
[ ]:
subestimator.score(X2, y2)

这表明确保您的需求在分区之间相对均匀。即使包括标准控制来规范化生成非平稳数据的基础力量(例如,时间趋势分量或差分时序数据,地理区域的虚拟变量等),当您的数据集按非均匀变量分区时,这些控制也不足以解决问题。您仍然需要在拟合之前打乱数据,或者仅对适合内存的子样本进行采样并拟合子估计器。