dask_expr._collection.Index.map_partitions

dask_expr._collection.Index.map_partitions

Index.map_partitions(func, *args, meta=_NoDefault.no_default, enforce_metadata=True, transform_divisions=True, clear_divisions=False, align_dataframes=False, parent_meta=None, **kwargs)

将一个Python函数应用于每个分区

参数
函数函数

应用于每个分区的函数。

args, kwargs

传递给函数的参数和关键字。参数和关键字可以包含 FrameBase 或常规的 Python 对象。类似于 DataFrame 的参数(包括 dask 和 pandas)必须与 self 具有相同数量的分区,或者只包含一个分区。关键字参数、单分区参数和一般 Python 对象参数将被广播到所有分区。

强制元数据bool, 默认 True

是否在运行时强制要求 func 生成的 DataFrame 结构实际上与 meta 的结构匹配。这将重命名和重新排序每个分区的列,如果无法执行此操作,则会引发错误,但如果 dtypes 不匹配,则不会引发错误。

transform_divisionsbool, 默认 True

是否将函数应用于分区,并将这些转换后的分区应用于输出。

clear_divisionsbool, 默认 False

是否应清除分区。如果为 True,transform_divisions 将被忽略。

metapd.DataFrame, pd.Series, dict, iterable, tuple, optional

一个空的 pd.DataFramepd.Series,其数据类型和列名与输出匹配。此元数据对于 dask dataframe 中的许多算法正常工作是必要的。为了便于使用,还提供了一些替代输入。除了 DataFrame 之外,还可以提供 {name: dtype} 的字典或 (name, dtype) 的可迭代对象(注意名称的顺序应与列的顺序匹配)。除了系列之外,还可以使用 (name, dtype) 的元组。如果未提供,dask 将尝试推断元数据。这可能会导致意外结果,因此建议提供 meta。有关更多信息,请参阅 dask.dataframe.utils.make_meta

示例

给定一个 DataFrame、Series 或 Index,例如:

>>> import pandas as pd
>>> import dask_expr as dd
>>> df = pd.DataFrame({'x': [1, 2, 3, 4, 5],
...                    'y': [1., 2., 3., 4., 5.]})
>>> ddf = dd.from_pandas(df, npartitions=2)

可以使用 map_partitions 在每个分区上应用一个函数。可以选择提供额外的参数和关键字,这些参数和关键字将在分区之后传递给函数。

在这里,我们将一个带有参数和关键字的函数应用于一个 DataFrame,结果得到一个 Series:

>>> def myadd(df, a, b=1):
...     return df.x + df.y + a + b
>>> res = ddf.map_partitions(myadd, 1, b=2)
>>> res.dtype
dtype('float64')

这里我们将一个函数应用于一个Series,结果得到一个Series:

>>> res = ddf.x.map_partitions(lambda x: len(x)) # ddf.x is a Dask Series Structure
>>> res.dtype
dtype('int64')

默认情况下,dask 会尝试通过在某些假数据上运行您提供的函数来推断输出元数据。这在许多情况下效果很好,但有时可能会很昂贵,甚至失败。为了避免这种情况,您可以使用 meta 关键字手动指定输出元数据。这可以通过多种形式指定,更多信息请参见 dask.dataframe.utils.make_meta

在这里,我们指定输出是一个没有名称的 Series,并且数据类型为 float64

>>> res = ddf.map_partitions(myadd, 1, b=2, meta=(None, 'f8'))

这里我们映射一个函数,该函数接收一个 DataFrame,并返回一个带有新列的 DataFrame:

>>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y))
>>> res.dtypes
x      int64
y    float64
z    float64
dtype: object

与之前一样,输出元数据也可以手动指定。这次我们传入一个 dict,因为输出是一个 DataFrame:

>>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y),
...                          meta={'x': 'i8', 'y': 'f8', 'z': 'f8'})

在元数据不变的情况下,您也可以直接传入对象本身:

>>> res = ddf.map_partitions(lambda df: df.head(), meta=ddf)

另请注意,假定索引和分区保持不变。如果正在映射的函数更改了索引/分区,则需要传递 clear_divisions=True

>>> ddf.map_partitions(func, clear_divisions=True)  

你的映射函数通过接受一个特殊的 partition_info 关键字参数来获取其在数据框中的位置信息。

>>> def func(partition, partition_info=None):
...     pass

这将接收以下信息:

>>> partition_info  
{'number': 1, 'division': 3}

对于每个作为dask数据帧的参数和关键字参数,您将收到表示数据帧第n个分区的数字(n)和分区中的第一个索引值(即分区中的第一个索引值)。如果分区未知(例如,如果索引未排序),则分区将返回None。