dask.dataframe.Series.reduction

dask.dataframe.Series.reduction

Series.reduction(chunk, aggregate=None, combine=None, meta=_NoDefault.no_default, token=None, split_every=None, chunk_kwargs=None, aggregate_kwargs=None, combine_kwargs=None, **kwargs)

通用行级归约。

参数
可调用

对每个分区进行操作的函数。应返回 pandas.DataFramepandas.Series 或标量。

聚合可调用,可选

用于操作 chunk 连接结果的函数。如果未指定,默认为 chunk。用于在树形归约中进行最终聚合。

aggregate 的输入取决于 chunk 的输出。如果 chunk 的输出是:

  • 标量:输入是一个序列,每个分区有一行。

  • 系列:输入是一个 DataFrame,每个分区对应一行。列是输出系列中的行。

  • DataFrame: 输入是一个 DataFrame,每个分区对应一行。列是输出 DataFrame 中的列。

应返回一个 pandas.DataFramepandas.Series 或一个标量。

合并可调用,可选

用于在树形归约中操作 chunk 的中间连接结果的函数。如果未提供,默认为 aggregate。输入/输出要求应与上述 aggregate 的描述相匹配。

metapd.DataFrame, pd.Series, dict, iterable, tuple, optional

一个空的 pd.DataFramepd.Series,其数据类型和列名与输出匹配。此元数据对于 dask dataframe 中的许多算法正常工作是必要的。为了便于使用,还提供了一些替代输入。除了 DataFrame 之外,还可以提供 {name: dtype} 的字典或 (name, dtype) 的可迭代对象(注意名称的顺序应与列的顺序匹配)。除了系列之外,还可以使用 (name, dtype) 的元组。如果未提供,dask 将尝试推断元数据。这可能会导致意外结果,因此建议提供 meta。有关更多信息,请参阅 dask.dataframe.utils.make_meta

令牌str, 可选

用于输出键的名称。

split_everyint, 可选

在进行树形归约时,将组分区为这个大小的组。如果设置为 False,则不会使用树形归约,所有中间结果将被连接并传递给 aggregate。默认值是 8。

chunk_kwargsdict, 可选

仅传递给 chunk 的关键字参数。

aggregate_kwargsdict, 可选

仅传递给 aggregate 的关键字参数。

combine_kwargsdict, 可选

仅传递给 combine 的关键字参数。

kwargs

所有剩余的关键字将被传递给 chunkcombineaggregate

示例

>>> import pandas as pd
>>> import dask.dataframe as dd
>>> df = pd.DataFrame({'x': range(50), 'y': range(50, 100)})
>>> ddf = dd.from_pandas(df, npartitions=4)

计算DataFrame中的行数。为此,计算每个分区中的行数,然后对结果求和:

>>> res = ddf.reduction(lambda x: x.count(),
...                     aggregate=lambda x: x.sum())
>>> res.compute()
x    50
y    50
dtype: int64

计算一个 Series 中大于或等于某个值(通过关键字提供)的元素行数。

>>> def count_greater(x, value=0):
...     return (x >= value).sum()
>>> res = ddf.x.reduction(count_greater, aggregate=lambda x: x.sum(),
...                       chunk_kwargs={'value': 25})
>>> res.compute()
np.int64(25)

同时聚合一个序列的总和和计数:

>>> def sum_and_count(x):
...     return pd.Series({'count': x.count(), 'sum': x.sum()},
...                      index=['count', 'sum'])
>>> res = ddf.x.reduction(sum_and_count, aggregate=lambda x: x.sum())
>>> res.compute()
count      50
sum      1225
dtype: int64

对 DataFrame 做同样的操作。这里 chunk 返回一个 DataFrame,这意味着传递给 aggregate 的输入是一个 DataFrame,其索引对于 ‘x’ 和 ‘y’ 都有非唯一条目。我们按索引分组,并对每个组求和以得到最终结果。

>>> def sum_and_count(x):
...     return pd.DataFrame({'count': x.count(), 'sum': x.sum()},
...                         columns=['count', 'sum'])
>>> res = ddf.reduction(sum_and_count,
...                     aggregate=lambda x: x.groupby(level=0).sum())
>>> res.compute()
   count   sum
x     50  1225
y     50  3725