dask_expr._collection.Series.map_overlap

dask_expr._collection.Series.map_overlap

Series.map_overlap(func, before, after, *args, meta=_NoDefault.no_default, enforce_metadata=True, transform_divisions=True, clear_divisions=False, align_dataframes=False, **kwargs)

对每个分区应用一个函数,与相邻分区共享行。

这对于实现窗口函数非常有用,例如 df.rolling(...).mean()df.diff()

参数
函数函数

应用于每个分区的函数。

之前int, timedelta 或字符串 timedelta

从分区 i - 1 的末尾开始,添加到分区 i 的行。

之后int, timedelta 或字符串 timedelta

从分区 i + 1 的开头追加到分区 i 的行。

args, kwargs

传递给函数的位置参数和关键字参数。位置参数是按分区计算的,而关键字参数在所有分区中共享。分区本身将是第一个位置参数,所有其他参数在其后传递。参数可以是 ScalarDelayed 或常规的 Python 对象。类似 DataFrame 的参数(包括 dask 和 pandas)将在应用函数之前重新分区以对齐(如果必要);请参阅 align_dataframes 以控制此行为。

强制元数据bool, 默认 True

是否在运行时强制要求 func 生成的 DataFrame 结构实际上与 meta 的结构匹配。这将重命名和重新排序每个分区的列,如果无法执行此操作,则会引发错误,但如果 dtypes 不匹配,则不会引发错误。

transform_divisionsbool, 默认 True

是否将函数应用于分区,并将这些转换后的分区应用于输出。

对齐数据框bool, 默认 True

是否重新分区 DataFrame 或 Series 类参数(包括 dask 和 pandas),以便在应用函数之前它们的分区对齐。这要求所有输入都有已知的分区。单分区输入将被拆分为多个分区。

如果为 False,所有输入必须具有相同数量的分区或单个分区。单分区输入将被广播到多分区输入的每个分区。

metapd.DataFrame, pd.Series, dict, iterable, tuple, optional

一个空的 pd.DataFramepd.Series,其数据类型和列名与输出匹配。此元数据对于 dask dataframe 中的许多算法正常工作是必要的。为了便于使用,还提供了一些替代输入。除了 DataFrame 之外,还可以提供 {name: dtype} 的字典或 (name, dtype) 的可迭代对象(注意名称的顺序应与列的顺序匹配)。除了系列之外,还可以使用 (name, dtype) 的元组。如果未提供,dask 将尝试推断元数据。这可能会导致意外结果,因此建议提供 meta。有关更多信息,请参阅 dask.dataframe.utils.make_meta

注释

给定正整数 beforeafter,以及一个函数 funcmap_overlap 执行以下操作:

  1. 在每个分区 i 的开头添加分区 i - 1 末尾的 before 行。第一个分区没有行被添加。

  2. after 行附加到每个分区 i ,从分区 i + 1 的开头开始。最后一个分区没有行附加。

  3. 对每个分区应用 func ,如果提供了额外的 argskwargs ,则传递它们。

  4. 从除第一个分区外的所有分区的开头修剪 before 行。

  5. 从除最后一个分区外的所有分区末尾修剪 after 行。

示例

给定一个 DataFrame、Series 或 Index,例如:

>>> import pandas as pd
>>> import dask_expr as dd
>>> df = pd.DataFrame({'x': [1, 2, 4, 7, 11],
...                    'y': [1., 2., 3., 4., 5.]})
>>> ddf = dd.from_pandas(df, npartitions=2)

一个大小为2的尾随移动窗口的滚动和可以通过在每个分区之前重叠2行来计算,然后映射对 df.rolling(2).sum() 的调用:

>>> ddf.compute()
    x    y
0   1  1.0
1   2  2.0
2   4  3.0
3   7  4.0
4  11  5.0
>>> ddf.map_overlap(lambda df: df.rolling(2).sum(), 2, 0).compute()
      x    y
0   NaN  NaN
1   3.0  3.0
2   6.0  5.0
3  11.0  7.0
4  18.0  9.0

pandas 的 diff 方法计算按周期数(可以是正数或负数)移动的离散差值。这可以通过在预先添加/追加相应行数后,将调用映射到 df.diff 到每个分区来实现,具体取决于符号:

>>> def diff(df, periods=1):
...     before, after = (periods, 0) if periods > 0 else (0, -periods)
...     return df.map_overlap(lambda df, periods=1: df.diff(periods),
...                           periods, 0, periods=periods)
>>> diff(ddf, 1).compute()
     x    y
0  NaN  NaN
1  1.0  1.0
2  2.0  1.0
3  3.0  1.0
4  4.0  1.0

如果你有一个 DatetimeIndex,你可以使用 pd.Timedelta 来进行基于时间的窗口操作,或者任何可以转换为 pd.Timedelta 的字符串:

>>> ts = pd.Series(range(10), index=pd.date_range('2017', periods=10))
>>> dts = dd.from_pandas(ts, npartitions=2)
>>> dts.map_overlap(lambda df: df.rolling('2D').sum(),
...                 pd.Timedelta('2D'), 0).compute()
2017-01-01     0.0
2017-01-02     1.0
2017-01-03     3.0
2017-01-04     5.0
2017-01-05     7.0
2017-01-06     9.0
2017-01-07    11.0
2017-01-08    13.0
2017-01-09    15.0
2017-01-10    17.0
Freq: D, dtype: float64