dask_expr._collection.Series.map_overlap
dask_expr._collection.Series.map_overlap¶
- Series.map_overlap(func, before, after, *args, meta=_NoDefault.no_default, enforce_metadata=True, transform_divisions=True, clear_divisions=False, align_dataframes=False, **kwargs)¶
对每个分区应用一个函数,与相邻分区共享行。
这对于实现窗口函数非常有用,例如
df.rolling(...).mean()
或df.diff()
。- 参数
- 函数函数
应用于每个分区的函数。
- 之前int, timedelta 或字符串 timedelta
从分区
i - 1
的末尾开始,添加到分区i
的行。- 之后int, timedelta 或字符串 timedelta
从分区
i + 1
的开头追加到分区i
的行。- args, kwargs
传递给函数的位置参数和关键字参数。位置参数是按分区计算的,而关键字参数在所有分区中共享。分区本身将是第一个位置参数,所有其他参数在其后传递。参数可以是
Scalar
、Delayed
或常规的 Python 对象。类似 DataFrame 的参数(包括 dask 和 pandas)将在应用函数之前重新分区以对齐(如果必要);请参阅align_dataframes
以控制此行为。- 强制元数据bool, 默认 True
是否在运行时强制要求
func
生成的 DataFrame 结构实际上与meta
的结构匹配。这将重命名和重新排序每个分区的列,如果无法执行此操作,则会引发错误,但如果 dtypes 不匹配,则不会引发错误。- transform_divisionsbool, 默认 True
是否将函数应用于分区,并将这些转换后的分区应用于输出。
- 对齐数据框bool, 默认 True
是否重新分区 DataFrame 或 Series 类参数(包括 dask 和 pandas),以便在应用函数之前它们的分区对齐。这要求所有输入都有已知的分区。单分区输入将被拆分为多个分区。
如果为 False,所有输入必须具有相同数量的分区或单个分区。单分区输入将被广播到多分区输入的每个分区。
- metapd.DataFrame, pd.Series, dict, iterable, tuple, optional
一个空的
pd.DataFrame
或pd.Series
,其数据类型和列名与输出匹配。此元数据对于 dask dataframe 中的许多算法正常工作是必要的。为了便于使用,还提供了一些替代输入。除了DataFrame
之外,还可以提供{name: dtype}
的字典或(name, dtype)
的可迭代对象(注意名称的顺序应与列的顺序匹配)。除了系列之外,还可以使用(name, dtype)
的元组。如果未提供,dask 将尝试推断元数据。这可能会导致意外结果,因此建议提供meta
。有关更多信息,请参阅dask.dataframe.utils.make_meta
。
注释
给定正整数
before
和after
,以及一个函数func
,map_overlap
执行以下操作:在每个分区
i
的开头添加分区i - 1
末尾的before
行。第一个分区没有行被添加。将
after
行附加到每个分区i
,从分区i + 1
的开头开始。最后一个分区没有行附加。对每个分区应用
func
,如果提供了额外的args
和kwargs
,则传递它们。从除第一个分区外的所有分区的开头修剪
before
行。从除最后一个分区外的所有分区末尾修剪
after
行。
示例
给定一个 DataFrame、Series 或 Index,例如:
>>> import pandas as pd >>> import dask_expr as dd >>> df = pd.DataFrame({'x': [1, 2, 4, 7, 11], ... 'y': [1., 2., 3., 4., 5.]}) >>> ddf = dd.from_pandas(df, npartitions=2)
一个大小为2的尾随移动窗口的滚动和可以通过在每个分区之前重叠2行来计算,然后映射对
df.rolling(2).sum()
的调用:>>> ddf.compute() x y 0 1 1.0 1 2 2.0 2 4 3.0 3 7 4.0 4 11 5.0 >>> ddf.map_overlap(lambda df: df.rolling(2).sum(), 2, 0).compute() x y 0 NaN NaN 1 3.0 3.0 2 6.0 5.0 3 11.0 7.0 4 18.0 9.0
pandas 的
diff
方法计算按周期数(可以是正数或负数)移动的离散差值。这可以通过在预先添加/追加相应行数后,将调用映射到df.diff
到每个分区来实现,具体取决于符号:>>> def diff(df, periods=1): ... before, after = (periods, 0) if periods > 0 else (0, -periods) ... return df.map_overlap(lambda df, periods=1: df.diff(periods), ... periods, 0, periods=periods) >>> diff(ddf, 1).compute() x y 0 NaN NaN 1 1.0 1.0 2 2.0 1.0 3 3.0 1.0 4 4.0 1.0
如果你有一个
DatetimeIndex
,你可以使用pd.Timedelta
来进行基于时间的窗口操作,或者任何可以转换为pd.Timedelta
的字符串:>>> ts = pd.Series(range(10), index=pd.date_range('2017', periods=10)) >>> dts = dd.from_pandas(ts, npartitions=2) >>> dts.map_overlap(lambda df: df.rolling('2D').sum(), ... pd.Timedelta('2D'), 0).compute() 2017-01-01 0.0 2017-01-02 1.0 2017-01-03 3.0 2017-01-04 5.0 2017-01-05 7.0 2017-01-06 9.0 2017-01-07 11.0 2017-01-08 13.0 2017-01-09 15.0 2017-01-10 17.0 Freq: D, dtype: float64