dask.dataframe.Series.reduction
dask.dataframe.Series.reduction¶
- Series.reduction(chunk, aggregate=None, combine=None, meta=_NoDefault.no_default, token=None, split_every=None, chunk_kwargs=None, aggregate_kwargs=None, combine_kwargs=None, **kwargs)¶
通用行级归约。
- 参数
- 块可调用
对每个分区进行操作的函数。应返回
pandas.DataFrame
、pandas.Series
或标量。- 聚合可调用,可选
用于操作
chunk
连接结果的函数。如果未指定,默认为chunk
。用于在树形归约中进行最终聚合。aggregate
的输入取决于chunk
的输出。如果chunk
的输出是:标量:输入是一个序列,每个分区有一行。
系列:输入是一个 DataFrame,每个分区对应一行。列是输出系列中的行。
DataFrame: 输入是一个 DataFrame,每个分区对应一行。列是输出 DataFrame 中的列。
应返回一个
pandas.DataFrame
、pandas.Series
或一个标量。- 合并可调用,可选
用于在树形归约中操作
chunk
的中间连接结果的函数。如果未提供,默认为aggregate
。输入/输出要求应与上述aggregate
的描述相匹配。- metapd.DataFrame, pd.Series, dict, iterable, tuple, optional
一个空的
pd.DataFrame
或pd.Series
,其数据类型和列名与输出匹配。此元数据对于 dask dataframe 中的许多算法正常工作是必要的。为了便于使用,还提供了一些替代输入。除了DataFrame
之外,还可以提供{name: dtype}
的字典或(name, dtype)
的可迭代对象(注意名称的顺序应与列的顺序匹配)。除了系列之外,还可以使用(name, dtype)
的元组。如果未提供,dask 将尝试推断元数据。这可能会导致意外结果,因此建议提供meta
。有关更多信息,请参阅dask.dataframe.utils.make_meta
。- 令牌str, 可选
用于输出键的名称。
- split_everyint, 可选
在进行树形归约时,将组分区为这个大小的组。如果设置为 False,则不会使用树形归约,所有中间结果将被连接并传递给
aggregate
。默认值是 8。- chunk_kwargsdict, 可选
仅传递给
chunk
的关键字参数。- aggregate_kwargsdict, 可选
仅传递给
aggregate
的关键字参数。- combine_kwargsdict, 可选
仅传递给
combine
的关键字参数。- kwargs
所有剩余的关键字将被传递给
chunk
、combine
和aggregate
。
示例
>>> import pandas as pd >>> import dask.dataframe as dd >>> df = pd.DataFrame({'x': range(50), 'y': range(50, 100)}) >>> ddf = dd.from_pandas(df, npartitions=4)
计算DataFrame中的行数。为此,计算每个分区中的行数,然后对结果求和:
>>> res = ddf.reduction(lambda x: x.count(), ... aggregate=lambda x: x.sum()) >>> res.compute() x 50 y 50 dtype: int64
计算一个 Series 中大于或等于某个值(通过关键字提供)的元素行数。
>>> def count_greater(x, value=0): ... return (x >= value).sum() >>> res = ddf.x.reduction(count_greater, aggregate=lambda x: x.sum(), ... chunk_kwargs={'value': 25}) >>> res.compute() np.int64(25)
同时聚合一个序列的总和和计数:
>>> def sum_and_count(x): ... return pd.Series({'count': x.count(), 'sum': x.sum()}, ... index=['count', 'sum']) >>> res = ddf.x.reduction(sum_and_count, aggregate=lambda x: x.sum()) >>> res.compute() count 50 sum 1225 dtype: int64
对 DataFrame 做同样的操作。这里
chunk
返回一个 DataFrame,这意味着传递给aggregate
的输入是一个 DataFrame,其索引对于 ‘x’ 和 ‘y’ 都有非唯一条目。我们按索引分组,并对每个组求和以得到最终结果。>>> def sum_and_count(x): ... return pd.DataFrame({'count': x.count(), 'sum': x.sum()}, ... columns=['count', 'sum']) >>> res = ddf.reduction(sum_and_count, ... aggregate=lambda x: x.groupby(level=0).sum()) >>> res.compute() count sum x 50 1225 y 50 3725