Dask DataFrame 最佳实践

开始使用 Dask DataFrame 很容易,但要 很好地 使用它确实需要一些经验。本页包含 Dask DataFrame 最佳实践的建议,并包括常见问题的解决方案。

使用 Pandas

对于适合放入内存的数据,pandas 通常比 Dask DataFrame 更快且更易于使用。虽然“大数据”工具可能令人兴奋,但在正常数据工具仍然适用的情况下,它们几乎总是不如后者。

减少,然后使用 pandas

与上述类似,即使你有一个大型数据集,在你的计算过程中可能会有一个点,你已经将事物减少到一个更易管理的水平。此时,你可能想要切换到 pandas。

df = dd.read_parquet('my-giant-file.parquet')
df = df[df.name == 'Alice']              # Select a subsection
result = df.groupby('id').value.mean()   # Reduce to a smaller size
result = result.compute()                # Convert to pandas dataframe
result...                                # Continue working with pandas

Pandas 性能提示适用于 Dask DataFrame

通常的 pandas 性能提示,如避免使用 apply,使用向量化操作,使用分类数据等,同样适用于 Dask DataFrame。关于这个主题,可以阅读 Tom AugspurgerModern Pandas

使用索引

Dask DataFrame 可以选择沿单个索引列进行排序。对此列的一些操作可以非常快。例如,如果您的数据集按时间排序,您可以快速选择某一天的数据,执行时间序列连接等。您可以通过查看 df.known_divisions 属性来检查您的数据是否已排序。您可以使用 .set_index(column_name) 方法设置索引列。不过,此操作开销较大,因此请谨慎使用(见下文):

df = df.set_index('timestamp')  # set the index to make some operations fast

df.loc['2001-01-05':'2001-01-12']  # this is very fast if you have an index
df.merge(df2, left_index=True, right_index=True)  # this is also very fast

更多信息,请参阅关于 数据框分区 的文档。

避免全数据洗牌

设置索引是一个重要但开销较大的操作(见上文)。你应该尽量少做,并且在之后应该持久化(见下文)。

一些操作,如 set_indexmerge/join ,在并行或分布式环境中比在单机内存中执行更为困难。特别是,重排操作 会变得非常依赖通信。例如,如果你的数据按客户ID排列,但现在你想按时间排列,所有分区都必须相互通信以交换数据片段。这在集群环境中可能是一个密集的过程。

所以,一定要设置索引,但尽量少做。

df = df.set_index('column_name')  # do this infrequently

此外,set_index 在某些情况下有一些可以加速的选项。例如,如果你知道你的数据集是排序的,或者你已经知道它是按哪些值划分的,你可以提供这些信息来加速 set_index 操作。更多信息,请参阅 set_index 文档字符串

df2 = df.set_index(d.timestamp, sorted=True)

智能持久化

备注

本节仅与分布式系统上的用户相关。

警告

persist 在查询优化器中存在一些缺点。它会阻止所有优化,并阻止我们将列投影或过滤器推入IO层。仅在绝对必要或之后需要完整数据集时才谨慎使用 persist。

通常 DataFrame 的工作负载如下所示:

  1. 从文件加载数据

  2. 过滤数据到特定子集

  3. 打乱数据以设置智能索引

  4. 在此索引数据之上进行的几种复杂查询

通常理想的做法是加载、过滤和打乱数据一次,并将此结果保存在内存中。之后,每个复杂的查询都可以基于这个内存中的数据,而不是每次都要重复完整的加载-过滤-打乱过程。为此,请使用 client.persist 方法:

df = dd.read_csv('s3://bucket/path/to/*.csv')
df = df[df.balance < 0]
df = client.persist(df)

df = df.set_index('timestamp')
df = client.persist(df)

>>> df.customer_id.nunique().compute()
18452844

>>> df.groupby(df.city).size().compute()
...

持久化是重要的,因为 Dask DataFrame 默认是 懒惰的。这是一种告诉集群它应该开始执行你迄今为止定义的计算,并尝试将这些结果保存在内存中的方式。你将得到一个新的 DataFrame,它在语义上等同于你的旧 DataFrame,但现在指向正在运行的数据。你的旧 DataFrame 仍然指向懒惰的计算。

# Don't do this
client.persist(df)  # persist doesn't change the input in-place

# Do this instead
df = client.persist(df)  # replace your old lazy DataFrame

重新分区以减少开销

你的 Dask DataFrame 被分割成许多 pandas DataFrame。我们有时称这些为“分区”,而且通常分区的数量是为你决定的。例如,它可能是你从中读取的 CSV 文件的数量。然而,随着时间的推移,当你通过过滤或连接来减少或增加 pandas DataFrame 的大小时,可能需要重新考虑你需要多少分区。分区过多或过少都有成本。

Dask DataFrame 的各个分区是 pandas DataFrame。Dask DataFrame 最佳实践之一是重新分区这些分区。

分区应舒适地适应内存(小于一吉字节),但也不应过多。每个分区上的每个操作都需要中央调度器处理几百微秒。如果你有几千个任务,这几乎不会被注意到,但如果可能的话,减少数量是很好的。

一个常见的情况是,你将大量数据加载到合理大小的分区中(Dask 的默认设置做出了不错的选择),但随后你将数据集过滤到只有原始数据的一小部分。此时,明智的做法是将许多小分区重新组合成几个较大的分区。你可以通过使用 dask.dataframe.DataFrame.repartition 方法来实现这一点:

df = dd.read_csv('s3://bucket/path/to/*.csv')
df = df[df.name == 'Alice']  # only 1/100th of the data
df = df.repartition(npartitions=df.npartitions // 100)

df = df.persist()  # if on a distributed system

这有助于减少开销并提高Pandas向量化操作的效率。你应该力求每个分区大约有100MB的数据。

连接

连接两个DataFrame的成本可能非常高,也可能非常低,这取决于具体情况。在以下情况下成本较低:

  1. 将 Dask DataFrame 与 pandas DataFrame 合并

  2. 将一个 Dask DataFrame 与另一个单分区的 Dask DataFrame 进行连接

  3. 沿着索引连接 Dask DataFrames

并且在以下情况下费用较高:

  1. 将Dask DataFrame沿着不是它们索引的列进行连接

昂贵的案例需要进行洗牌。这没有问题,Dask DataFrame 会很好地完成这项工作,但它会比典型的线性时间操作更昂贵:

dd.merge(a, pandas_df)  # fast
dd.merge(a, b, left_index=True, right_index=True)  # fast
dd.merge(a, b, left_index=True, right_on='id')  # half-fast, half-slow
dd.merge(a, b, left_on='id', right_on='id')  # slow

更多信息请参见 连接

使用 Parquet

Apache Parquet 是一种列式二进制格式。它是存储大量表格数据的实际标准,也是我们推荐的基本表格数据的存储解决方案。

df.to_parquet('path/to/my-results/')
df = dd.read_parquet('path/to/my-results/')

与CSV等格式相比,Parquet带来了以下优势:

  1. 阅读和写作速度更快,通常快4-10倍

  2. 存储更紧凑,通常可以节省2-5倍的空间。

  3. 它有一个模式,因此关于列的类型没有歧义。这避免了令人困惑的错误。

  4. 它支持更多高级的数据类型,如分类数据、正确的日期时间等。

  5. 它更具可移植性,并且可以与其他系统如数据库或Apache Spark一起使用。

  6. 根据数据的划分方式,Dask 可以识别排序列,有时可以更高效地挑选出数据子集。

更多详情请参见 Dask 数据帧和 Parquet