实时笔记本

你可以在 live session Binder 中运行这个笔记本,或者在 Github 上查看它。

增量训练大型数据集

我们可以在大型数据集上一次训练一批模型。许多 Scikit-Learn 估计器实现了 partial_fit 方法,以实现批量增量学习。

est = SGDClassifier(...)
est.partial_fit(X_train_1, y_train_1)
est.partial_fit(X_train_2, y_train_2)
...

Scikit-Learn 的文档在其 用户指南 中更深入地讨论了这种方法。

本笔记本演示了 Dask-ML 的 Incremental 元估计器的使用,该元估计器自动在 Dask 数组和数据帧上使用 Scikit-Learn 的 partial_fit。Scikit-Learn 处理所有计算,而 Dask 处理数据管理,根据需要加载和移动数据批次。这使得可以在多台机器上分布的大型数据集或不适合内存的数据集上进行扩展,所有这些都使用熟悉的流程。

这个例子展示了…

  • 使用 Dask-ML Incremental 元估计器包装一个实现 partial_fit 的 Scikit-Learn 估计器

  • 在这个包装的估计器上进行训练、预测和评分

尽管此示例使用了 Scikit-Learn 的 SGDClassifer,但 Incremental 元估计器将适用于任何实现了 partial_fit 以及 scikit-learn 基础估计器 API 的类。

4dfb384a28664a76a601537e946bf858 9808d10098bf4cb7ac42c3adcd26b939

设置 Dask

我们首先启动一个 Dask 客户端,以便访问 Dask 仪表板,该仪表板将提供进度和性能指标。

你可以在运行单元格后点击仪表板链接来查看仪表板

[ ]:
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1)
client

创建数据

我们创建了一个合成数据集,它足够大以引起兴趣,但又足够小以便快速运行。

我们的数据集有 1,000,000 个样本和 100 个特征。

[ ]:
import dask
import dask.array as da
from dask_ml.datasets import make_classification


n, d = 100000, 100

X, y = make_classification(n_samples=n, n_features=d,
                           chunks=n // 10, flip_y=0.2)
X

有关从实际数据创建Dask数组和数据帧的更多信息,请参阅 Dask数组Dask数据帧 的文档。

分割数据用于训练和测试

我们将数据集分为训练数据和测试数据,以确保我们有公平的测试,从而帮助评估:

[ ]:
from dask_ml.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y)
X_train

在内存中持久化数据

这个数据集足够小,可以放入分布式内存中,因此我们调用 dask.persist 来请求 Dask 执行上述计算并将结果保存在内存中。

[ ]:
X_train, X_test, y_train, y_test = dask.persist(X_train, X_test, y_train, y_test)

如果你在处理一个数据集无法完全放入内存的情况,那么你应该跳过这一步。所有功能仍然可以正常工作,但速度会变慢,并且占用更少的内存。

调用 dask.persist 将把我们的数据保存在内存中,因此在我们多次处理数据时不需要进行计算。例如,如果我们的数据来自CSV文件且未被持久化,那么在每次处理时都需要重新读取CSV文件。如果数据不适合RAM,这是可取的,但否则会减慢我们的计算速度。

预计算类

我们预先从训练数据中计算出类别,这是这个分类示例所必需的:

[ ]:
classes = da.unique(y_train).compute()
classes

创建 Scikit-Learn 模型

我们使用底层的 Scikit-Learn 估计器,一个 SGDClassifier

[ ]:
from sklearn.linear_model import SGDClassifier

est = SGDClassifier(loss='log', penalty='l2', tol=1e-3)

这里我们使用 SGDClassifier,但任何实现了 partial_fit 方法的估计器都可以工作。实现此API的Scikit-Learn模型列表可在此处找到 这里

使用 Dask-ML 的增量元估计器进行包装

我们现在用 `dask_ml.wrappers.Incremental <http://ml.dask.org/modules/generated/dask_ml.wrappers.Incremental.html#dask_ml.wrappers.Incremental>`__ 元估计器来包装我们的 SGDClassifer

[ ]:
from dask_ml.wrappers import Incremental

inc = Incremental(est, scoring='accuracy')

请记住,Incremental 只负责数据管理,而将实际的算法留给底层的 Scikit-Learn 估计器。

注意:我们在 Dask 估计器中设置了评分参数,以告知它处理评分。当使用 Dask 数组作为测试数据时,这会工作得更好。

模型训练

Incremental 实现了一个 fit 方法,该方法将对数据集执行一次循环,在 Dask 数组的每个块上调用 partial_fit

在拟合过程中,您可能希望观察仪表盘,以查看许多批次的顺序拟合情况。

[ ]:
inc.fit(X_train, y_train, classes=classes)
[ ]:
inc.score(X_test, y_test)

多次遍历训练数据

调用 .fit 会遍历我们数据的所有块一次。然而,在许多情况下,我们可能希望多次遍历训练数据。为此,我们可以使用 Incremental.partial_fit 方法和一个 for 循环。

[ ]:
est = SGDClassifier(loss='log', penalty='l2', tol=0e-3)
inc = Incremental(est, scoring='accuracy')
[ ]:
for _ in range(10):
    inc.partial_fit(X_train, y_train, classes=classes)
    print('Score:', inc.score(X_test, y_test))

预测与评分

最后,我们也可以在我们的测试数据上调用 Incremental.predictIncremental.score

[ ]:
inc.predict(X_test)  # Predict produces lazy dask arrays
[ ]:
inc.predict(X_test)[:100].compute()  # call compute to get results
[ ]:
inc.score(X_test, y_test)

了解更多

在本笔记本中,我们介绍了如何使用 Dask-ML 的 Incremental 元估计器来自动化使用实现了 partial_fit 方法的 Scikit-Learn 估计器的增量训练过程。如果你想了解更多关于这个过程的信息,你可能需要查阅以下文档:

  1. https://scikit-learn.org/stable/computing/scaling_strategies.html

  2. Dask-ML 增量 API 文档

  3. 与 Dask-ML 的增量兼容的 Scikit-Learn 估计器列表

  4. 有关模型评估中训练-测试分割的更多信息,请参见 超参数和模型验证