实时笔记本

您可以在 live session Binder 中运行此笔记本,或查看 Github 上的内容。

文本向量化管道

此示例展示了如何使用 Dask-ML 对大型文本数据集进行并行分类。它改编自 这个 scikit-learn 示例

主要区别在于

  • 我们将整个模型,包括文本向量化,作为一个管道进行拟合。

  • 我们使用像 Dask BagDask DataframeDask Array 这样的 dask 集合,而不是生成器来处理大于内存的数据集。

[ ]:
from dask.distributed import Client, progress

client = Client(n_workers=2, threads_per_worker=2, memory_limit='2GB')
client

获取数据

Scikit-Learn 提供了一个工具来获取新闻组数据集。

[ ]:
import sklearn.datasets

bunch = sklearn.datasets.fetch_20newsgroups()

来自 scikit-learn 的数据并不 太大 ,因此数据只是返回在内存中。每个文档是一个字符串。我们预测的目标是一个整数,它编码了帖子的主题。

我们将直接将文档和目标加载到一个 dask DataFrame 中。在实践中,对于大于内存的数据集,您可能会使用 dask.bagdask.delayed 从磁盘或云存储加载文档。

[ ]:
import dask.dataframe as dd
import pandas as pd

df = dd.from_pandas(pd.DataFrame({"text": bunch.data, "target": bunch.target}),
                    npartitions=25)

df

text 列中的每一行都有一些元数据和一篇帖子的完整文本。

[ ]:
print(df.head().loc[0, 'text'][:500])

特征哈希

Dask 的 HashingVectorizer 提供了与 scikit-learn 的实现 类似的 API。实际上,Dask-ML 的实现使用了 scikit-learn 的实现,将其应用于输入的 dask.dataframe.Seriesdask.bag.Bag 的每个分区。

转换,一旦我们实际计算结果,就会并行进行并返回一个 dask 数组。

[ ]:
import dask_ml.feature_extraction.text

vect = dask_ml.feature_extraction.text.HashingVectorizer()
X = vect.fit_transform(df['text'])
X

输出数组 X 具有未知的块大小,因为输入的 dask Series 或 Bags 不知道它们自己的长度。

X 中的每个块都是一个 scipy.sparse 矩阵。

[ ]:
X.blocks[0].compute()

这是一个文档-术语矩阵。每一行是原始帖子的哈希表示。

分类管道

我们可以将 HashingVectorizerIncremental 以及像 scikit-learn 的 SGDClassifier 这样的分类器结合起来,创建一个分类管道。

我们将预测主题是否在 comp 类别中。

[ ]:
bunch.target_names
[ ]:
import numpy as np

positive = np.arange(len(bunch.target_names))[['comp' in x for x in bunch.target_names]]
y = df['target'].isin(positive).astype(int)
y
[ ]:
import numpy as np
import sklearn.linear_model
import sklearn.pipeline

import dask_ml.wrappers

由于输入来自一个dask Series,且块大小未知,我们需要指定 assume_equal_chunks=True。这告诉Dask-ML我们知道``X``中的每个分区与``y``中的一个分区匹配。

[ ]:
sgd = sklearn.linear_model.SGDClassifier(
    tol=1e-3
)
clf = dask_ml.wrappers.Incremental(
    sgd, scoring='accuracy', assume_equal_chunks=True
)
pipe = sklearn.pipeline.make_pipeline(vect, clf)

SGDClassifier.partial_fit 需要预先知道所有类别的完整集合。因为我们的 sgd 被封装在一个 Incremental 中,我们需要将其作为 fit 中的 incremental__classes 关键字参数传递。

[ ]:
pipe.fit(df['text'], y,
         incremental__classes=[0, 1]);

像往常一样,Incremental.predict 会延迟返回预测结果,作为一个 dask 数组。

[ ]:
predictions = pipe.predict(df['text'])
predictions

我们可以使用 dask_ml.metrics.accuracy_score 并行计算预测和评分。

[ ]:
dask_ml.metrics.accuracy_score(y, predictions)

这种简单的 HashingVectorizer 和 SGDClassifier 组合在这个预测任务中非常有效。