实时笔记本

您可以在 live session Binder 中运行此笔记本,或查看 Github 上的内容。

为小型数据问题扩展Scikit-Learn

这个示例展示了如何使用 Dask 将 scikit-learn 扩展到机器集群,以解决 CPU 密集型问题。我们将在一个小数据集上拟合一个大模型,进行超参数的网格搜索。

这段视频在一个更大的集群上演示了相同的示例。

[ ]:
from IPython.display import YouTubeVideo

YouTubeVideo("5Zf6DQaf7jk")
[ ]:
from dask.distributed import Client, progress
client = Client(n_workers=4, threads_per_worker=1, memory_limit='2GB')
client

分布式训练

ed9557ce8d0a4c31bcc4d9b5f57c6f3f 0336d4418bae4b50b1cf2d218c68c5eb

Scikit-learn 使用 joblib 进行单机并行处理。这使得您可以在笔记本电脑或工作站的所有核心上训练大多数估计器(任何接受 n_jobs 参数的估计器)。

另外,Scikit-Learn 可以使用 Dask 进行并行处理。这使您可以在不显著改变代码的情况下,利用 集群 的所有核心来训练那些估计器。

这对于在中等规模的数据集上训练大型模型非常有用。当你在搜索许多超参数时,或者在使用包含许多个体估计器的集成方法时,你可能会有一个大型模型。对于太小的数据集,训练时间通常会足够短,以至于集群范围内的并行性没有帮助。对于太大的数据集(大于单个机器的内存),scikit-learn估计器可能无法应对(尽管Dask-ML提供了处理大于内存数据集的其他方法)。

创建 Scikit-Learn 管道

[ ]:
from pprint import pprint
from time import time
import logging

from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline
[ ]:
# Scale Up: set categories=None to use all the categories
categories = [
    'alt.atheism',
    'talk.religion.misc',
]

print("Loading 20 newsgroups dataset for categories:")
print(categories)

data = fetch_20newsgroups(subset='train', categories=categories)
print("%d documents" % len(data.filenames))
print("%d categories" % len(data.target_names))
print()

我们将定义一个小型管道,该管道结合了文本特征提取与一个简单的分类器。

[ ]:
pipeline = Pipeline([
    ('vect', HashingVectorizer()),
    ('tfidf', TfidfTransformer()),
    ('clf', SGDClassifier(max_iter=1000)),
])

定义参数搜索的网格

对某些参数进行网格搜索。

[ ]:
parameters = {
    'tfidf__use_idf': (True, False),
    'tfidf__norm': ('l1', 'l2'),
    'clf__alpha': (0.00001, 0.000001),
    # 'clf__penalty': ('l2', 'elasticnet'),
    # 'clf__n_iter': (10, 50, 80),
}
[ ]:
grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1, verbose=1, cv=3, refit=False)

为了适应这种情况,我们通常会这样写

grid_search.fit(data.data, data.target)

这将使用默认的 joblib 后端(多进程)进行并行处理。要使用 Dask 分布式后端,该后端将使用一组机器来训练模型,请在 parallel_backend 上下文中执行拟合。

[ ]:
import joblib

with joblib.parallel_backend('dask'):
    grid_search.fit(data.data, data.target)

如果在拟合过程中打开了分布式仪表盘,你会注意到每个工作节点都执行了一些拟合任务。

并行, 分布式预测

有时,你在一个小数据集上进行训练,但需要对大量数据进行预测。在这种情况下,你希望你的估计器能够处理用于训练的 NumPy 数组和 pandas DataFrame,以及用于预测的 dask 数组或 DataFrame。`dask_ml.wrappers.ParallelPostFit <http://ml.dask.org/modules/generated/dask_ml.wrappers.ParallelPostFit.html#dask_ml.wrappers.ParallelPostFit>`__ 正好提供了这一点。它是一个元估计器。它在训练期间不做任何事情;底层估计器(可能是一个 scikit-learn 估计器)可能会在单台机器的内存中。但像 predictscore 等任务是并行化和分布式的。

大多数情况下,使用 ParallelPostFit 就像包装原始估计器一样简单。在 GridSearch 内部使用时,你需要更新参数的键,就像使用任何元估计器一样。唯一的复杂性出现在将 ParallelPostFit 与另一个元估计器(如 GridSearchCV)一起使用时。在这种情况下,你需要用 estimator__ 作为参数名称的前缀。

[ ]:
from sklearn.datasets import load_digits
from sklearn.svm import SVC
from dask_ml.wrappers import ParallelPostFit

我们将加载用于训练的小型 NumPy 数组。

[ ]:
X, y = load_digits(return_X_y=True)
X.shape
[ ]:
svc = ParallelPostFit(SVC(random_state=0, gamma='scale'))

param_grid = {
    # use estimator__param instead of param
    'estimator__C': [0.01, 1.0, 10],
}

grid_search = GridSearchCV(svc, param_grid, cv=3)

并按常规调整。

[ ]:
grid_search.fit(X, y)

我们将通过多次复制训练数据来模拟一个大型的dask数组。实际上,您会从文件系统中加载这个数据。

[ ]:
import dask.array as da
[ ]:
big_X = da.concatenate([
    da.from_array(X, chunks=X.shape)
    for _ in range(10)
])
big_X

predictpredict_proba 这样的操作返回的是 dask 数组,而不是 NumPy 数组。当你进行计算时,工作将在并行、核心外或集群上分布式完成。

[ ]:
predicted = grid_search.predict(big_X)
predicted

此时,预测结果可以写入磁盘,或者在返回给客户端之前进行聚合。