实时笔记本

你可以在 live session Binder 中运行这个笔记本,或者在 Github 上查看它。

Dask 数据框

Dask 数据帧是分块的 Pandas 数据帧

Dask Dataframes 协调许多 Pandas dataframes,沿索引分区。它们支持 Pandas API 的一个大子集。

启动 Dask 客户端以使用仪表板

启动 Dask 客户端是可选的。它将提供一个仪表板,这对于深入了解计算非常有用。

当你在下方创建客户端后,仪表板的链接将会变得可见。我们建议在使用笔记本的另一侧屏幕上打开它。虽然安排窗口可能需要一些努力,但在学习时同时看到两者是非常有用的。

[ ]:
from dask.distributed import Client

client = Client(n_workers=2, threads_per_worker=2, memory_limit="1GB")
client

创建随机数据框

我们创建一个具有以下属性的随机时间序列数据:

  1. 它存储了2000年1月份每秒钟的记录

  2. 它按天分割那个月,将每一天保持为一个分区的数据框

  3. 除了日期时间索引外,它还有名称、ID和数值列。

这是一个大约240 MB的小数据集。通过设置一些 `dask.datasets.timeseries() 参数 <https://docs.dask.org/en/stable/api.html#dask.datasets.timeseries>`__,增加天数或减少数据点之间的时间间隔,可以练习使用更大的数据集。

[ ]:
import dask

df = dask.datasets.timeseries()

与 Pandas 不同,Dask DataFrames 是 惰性 的,这意味着数据仅在需要进行计算时才会加载。这里不会打印数据,而是用省略号 (...) 代替。

[ ]:
df

尽管如此,列名和数据类型是已知的。

[ ]:
df.dtypes

某些操作将自动显示数据。

[ ]:
# This sets some formatting parameters for displayed data.
import pandas as pd

pd.options.display.precision = 2
pd.options.display.max_rows = 10

[ ]:
df.head(3)

使用标准 Pandas 操作

大多数常见的 Pandas 操作可以在 Dask 数据框中以相同的方式使用。此示例展示了如何基于掩码条件对数据进行切片,然后确定 x 列中数据的标准差。

[ ]:
df2 = df[df.y > 0]
df3 = df2.groupby("name").x.std()
df3

注意 df3 中的数据仍然用省略号表示。前面单元格中的所有操作都是惰性操作。当你想要将结果作为 Pandas 数据框或系列时,可以调用 .compute()

如果你在上面启动了 Client(),那么你可以在计算过程中查看状态页面以查看进度。

[ ]:
computed_df = df3.compute()
type(computed_df)

[ ]:
computed_df

请注意,计算的数据现在显示在输出中。

另一个示例计算是聚合多个列,如下所示。同样,仪表板将显示计算的进度。

[ ]:
df4 = df.groupby("name").aggregate({"x": "sum", "y": "max"})
df4.compute()

Dask数据框也可以像Pandas数据框一样进行连接。在这个例子中,我们将``df4``中的聚合数据与``df``中的原始数据进行连接。由于``df``的索引是时间序列,而``df4``是按名称索引的,我们使用``left_on=”name”``和``right_index=True``来定义合并列。我们还为两个数据框中共同的列设置了后缀,以便区分它们。

最后,由于 df4 很小,我们还确保它是一个单分区数据框。

[ ]:
df4 = df4.repartition(npartitions=1)
joined = df.merge(
    df4, left_on="name", right_index=True, suffixes=("_original", "_aggregated")
)
joined.head()

在内存中持久化数据

如果你的数据集有足够的可用RAM,那么你可以将数据持久化在内存中。

这使得未来的计算速度大大提高。

[ ]:
df = df.persist()

时间序列操作

因为 df 有一个日期时间索引,时间序列操作可以高效地工作。

下面的第一个示例将数据重新采样为每小时间隔,以减少数据帧的总大小。然后计算 xy 列的平均值。

[ ]:
df[["x", "y"]].resample("1h").mean().head()

下一个示例以24小时为间隔对数据进行重采样,并绘制平均值。注意,plot() 是在 compute() 之后调用的,因为直到数据被计算后 plot() 才能工作。

[ ]:
%matplotlib inline
df[['x', 'y']].resample('24h').mean().compute().plot();

这个最终示例计算了数据的滚动24小时均值。

[ ]:
df[["x", "y"]].rolling(window="24h").mean().head()

随机访问在索引上是廉价的,但由于 Dask 数据框是惰性的,因此必须计算以实现数据的物化。

[ ]:
df.loc["2000-01-05"]

[ ]:
%time df.loc['2000-01-05'].compute()

设置索引

数据按索引列排序。这允许更快的访问、连接、分组应用操作等。然而,并行排序数据可能会很耗费资源,因此设置索引既重要,但也只需要偶尔进行。在接下来的几个示例中,我们将按 name 列对数据进行分组,因此我们将该列设置为索引以提高效率。

[ ]:
df5 = df.set_index("name")
df5

因为重置此数据集的索引成本高昂,并且我们可以将其放入可用的RAM中,所以我们将其持久化到内存中。

[ ]:
df5 = df5.persist()
df5

Dask 现在知道所有数据的位置,按名称索引。因此,像随机访问这样的操作是廉价且高效的。

[ ]:
%time df5.loc['Alice'].compute()

使用 Scikit-Learn 进行 Groupby Apply

现在我们的数据已经按名称排序,我们可以低成本地进行诸如按名称随机访问或使用自定义函数进行分组应用等操作。

在这里,我们为每个名称训练一个不同的 scikit-learn 线性回归模型。

[ ]:
from sklearn.linear_model import LinearRegression


def train(partition):
    if not len(partition):
        return
    est = LinearRegression()
    est.fit(partition[["x"]].values, partition.y.values)
    return est

train()partition 参数将是 DataFrameGroupBy 中的一个组实例。如果分区中没有数据,我们不需要继续。如果有数据,我们希望拟合线性回归模型,并将其作为该组的值返回。

现在使用 df5 进行操作,其索引是来自 df 的名称,我们可以按 names 列进行分组。这恰好也是索引,但这没关系。然后我们使用 .apply().groupby() 生成的 DataFrameGroupBy 中的每个组运行 train()

meta 参数告诉 Dask 如何创建用于保存 .apply() 结果的 DataFrameSeries。在这种情况下,train() 返回一个单一值,因此 .apply() 将创建一个 Series。这意味着我们需要告诉 Dask 该单一列的类型应该是什么,并可选地给它一个名称。

指定单列的最简单方法是使用元组。列的名称是元组的第一元素。由于这是一系列线性回归,我们将列命名为 "LinearRegression"。元组的第二元素是 train 返回值的类型。在这种情况下,Pandas 会将结果存储为一般的 object,这应该是我们传递的类型。

[ ]:
df6 = df5.groupby("name").apply(
    train, meta=("LinearRegression", object)
).compute()
df6

进一步阅读

要更深入地了解 Dask 数据帧,请参阅 dask 教程,笔记本 04 和 07。