Dask DataFrame 设计

Dask DataFrame 设计

Dask DataFrame 协调许多沿索引排列的 Pandas DataFrame/Series。我们通过以下组件定义一个 Dask DataFrame 对象:

  • 一个带有特殊键集的 Dask 图,用于指定分区,例如 ('x', 0), ('x', 1), ...

  • 一个名称,用于识别Dask图中的哪些键引用此DataFrame,例如 'x'

  • 一个包含适当元数据的空 Pandas 对象(例如,列名、数据类型等)

  • 沿索引的一系列分区边界称为 divisions

元数据

许多 DataFrame 操作依赖于知道列的名称和数据类型。为了跟踪这些信息,所有 Dask DataFrame 对象都有一个 _meta 属性,该属性包含一个具有相同数据类型和名称的空 Pandas 对象。例如:

>>> df = pd.DataFrame({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
>>> ddf = dd.from_pandas(df, npartitions=2)
>>> ddf._meta
Empty DataFrame
Columns: [a, b]
Index: []
>>> ddf._meta.dtypes
a     int64
b    object
dtype: object

在内部,Dask DataFrame 尽其所能将此信息通过所有操作传播,因此大多数情况下用户不必担心这一点。通常这是通过对一小部分假数据进行操作评估来完成的,这些假数据可以在 _meta_nonempty 属性中找到:

>>> ddf._meta_nonempty
   a    b
0  1  foo
1  1  foo

有时这种操作可能在用户定义的函数中失败(例如在使用 DataFrame.apply 时),或者可能非常昂贵。对于这些情况,许多函数支持一个可选的 meta 关键字,允许直接指定元数据,避免推断步骤。为了方便,这支持几种选项:

  1. 具有适当数据类型和名称的 Pandas 对象。如果非空,将取一个空切片:

>>> ddf.map_partitions(foo, meta=pd.DataFrame({'a': [1], 'b': [2]}))
  1. 适当名称和数据类型的描述。这可以有几种形式:

  • 一个 {name: dtype} 的字典或一个 (name, dtype) 的可迭代对象指定了一个 DataFrame。注意顺序很重要:meta 中名称的顺序应与列的顺序匹配。

  • 一个 (name, dtype) 的元组指定了一个系列

此关键字适用于所有接受用户提供可调用对象的函数/方法(例如 DataFrame.map_partitionsDataFrame.apply 等),以及许多创建函数(例如 dd.from_delayed)。

分区

在内部,Dask DataFrame 被分割成许多分区,每个分区是一个 Pandas DataFrame。这些 DataFrame 沿着索引垂直分割。当我们的索引是有序的,并且我们知道分区划分的值时,我们就可以巧妙且高效地处理昂贵的算法(例如 groupby、连接等)。

例如,如果我们有一个时间序列索引,那么我们的分区可能会按月份划分:所有一月份的数据将存放在一个分区中,而所有二月份的数据将存放在下一个分区中。在这些情况下,沿着索引进行的操作,如 locgroupbyjoin/merge,可以比在并行情况下更加高效。你可以通过以下字段查看DataFrame的分区数量和划分情况:

>>> df.npartitions
4
>>> df.divisions
['2015-01-01', '2015-02-01', '2015-03-01', '2015-04-01', '2015-04-31']

分区数量和划分值可能在优化过程中发生变化。优化器将尝试创建大小合理的分区,以避免因过多小分区而使调度器负担过重。

分区包括每个分区的索引的最小值和最后一个分区的索引的最大值。在上面的例子中,如果用户搜索一个特定的日期时间范围,那么我们就知道需要检查哪些分区,哪些可以忽略:

>>> df.loc['2015-01-20': '2015-02-10']  # Must inspect first two partitions

通常我们没有关于分区的此类信息。例如,在读取CSV文件时,如果没有额外的用户输入,我们不知道数据是如何划分的。在这种情况下,.divisions 将全部为 None

>>> df.divisions
[None, None, None, None, None]

在这些情况下,任何需要一个干净分区且分区已知的 DataFrame 的操作都必须执行排序。这通常可以通过调用 df.set_index(...) 来实现。

分组

默认情况下,groupby 会根据几个不同的因素选择输出分区的数量。它会查看分组键的数量来猜测数据的基数。它会使用这些信息来计算一个基于输入分区数量的因子。你可以通过使用 split_out 参数指定输出分区的数量来覆盖这种行为。

result = df.groupby('id').value.mean()
result.npartitions  # returns 1

result = df.groupby(['id', 'id2']).value.mean()
result.npartitions  # returns 5

result = df.groupby('id').value.mean(split_out=8)
result.npartitions  # returns 8

某些 groupby 聚合函数的 split_out 默认值不同。split_out=True 将保持分区数量不变,这对于那些不会大幅减少行数的操作非常有用。

result = df.groupby('id').value.nunique()
result.npartitions  # returns same as df.npartitions