实时笔记本

您可以在 live session Binder 中运行此笔记本,或查看 Github 上的内容。

数据框:分组

本笔记本使用 Pandas 的 groupby-aggregate 和 groupby-apply 在可扩展的 Dask 数据框上进行操作。它将讨论常见用法和最佳实践。

启动 Dask 客户端以使用仪表板

启动 Dask 客户端是可选的。它将提供一个仪表板,这对于深入了解计算非常有用。

当你在下方创建客户端后,仪表板的链接将会变得可见。我们建议在使用笔记本的另一侧屏幕上打开它。虽然安排窗口可能需要一些努力,但在学习时同时看到两者是非常有用的。

[ ]:
from dask.distributed import Client
client = Client(n_workers=1, threads_per_worker=4, processes=True, memory_limit='2GB')
client

人工数据集

我们创建一个人工时间序列数据集,以帮助我们进行groupby操作

[ ]:
import dask
df = dask.datasets.timeseries()
df

这个数据集足够小,可以放入集群的内存中,因此我们现在将其持久化。

如果你的数据集变得太大而无法放入内存,你可以跳过这一步。

[ ]:
df = df.persist()

分组聚合

Dask 数据帧实现了 Pandas groupby API 的常用子集(参见 Pandas Groupby 文档)。

我们从 groupby 聚合开始。假设组的数量较少(少于一百万),这些通常是相当高效的。

[ ]:
df.groupby('name').x.mean().compute()

性能将取决于你进行的聚合(均值 vs 标准差)、你分组的键(名称 vs ID)以及总组数

[ ]:
%time _ = df.groupby('id').x.mean().compute()
[ ]:
%time _ = df.groupby('name').x.mean().compute()
[ ]:
%time df.groupby('name').agg({'x': ['mean', 'std'], 'y': ['mean', 'count']}).compute().head()

这与Pandas的情况相同。一般来说,Dask.dataframe 的 groupby-聚合 性能大致与 Pandas 的 groupby-聚合 性能相同,只是更具扩展性。

你可以在 Pandas 文档 中阅读更多关于 Pandas 常见聚合的内容。

自定义聚合

Dask 数据框聚合可用于自定义聚合(参见 Dask 数据框聚合文档

许多组

默认情况下,groupby-聚合(如groupby-mean或groupby-sum)将结果作为单分区Dask数据帧返回。它们的结果*通常*非常小,因此*通常*这是一个不错的选择。

然而,有时人们希望对*许多*组(数百万或更多)进行分组聚合。在这些情况下,完整的结果可能无法放入单个 Pandas 数据框输出中,您可能需要将输出拆分为多个分区。您可以使用 split_out= 参数来控制这一点。

[ ]:
# Computational graph of a single output aggregation (for a small number of groups, like 1000)
df.groupby('name').x.mean().visualize(node_attr={'penwidth': '6'})
[ ]:
# Computational graph of an aggregation to four outputs (for a larger number of groups, like 1000000)
df.groupby('id').x.mean(split_out=4).visualize(node_attr={'penwidth': '6'})

Groupby Apply

分组聚合通常非常快,因为它们可以很容易地分解为已知的操作。数据不需要移动太多,我们可以在网络中传递小的中间值。

然而,对于某些操作,要应用的函数需要来自给定组的所有数据(例如,名为“Alice”的所有记录)。这将导致大量的通信并更加昂贵,但仍然可以通过Groupby-apply方法实现。如果可以使用groupby-聚合,则应避免这种情况。

在下面的例子中,我们在每个人的名字上训练一个简单的 Scikit-Learn 机器学习模型。

[ ]:
from sklearn.linear_model import LinearRegression

def train(partition):
    if partition.empty:
        return
    est = LinearRegression()
    est.fit(partition[['x', 'id']].values, partition.y.values)
    return est
[ ]:
%time df.groupby('name').apply(train, meta=object).compute().sort_index()
[ ]: