dask_expr._collection.DataFrame.map_分区
dask_expr._collection.DataFrame.map_分区¶
- DataFrame.map_partitions(func, *args, meta=_NoDefault.no_default, enforce_metadata=True, transform_divisions=True, clear_divisions=False, align_dataframes=False, parent_meta=None, **kwargs)¶
将一个Python函数应用于每个分区
- 参数
- 函数函数
应用于每个分区的函数。
- args, kwargs
传递给函数的参数和关键字。参数和关键字可以包含
FrameBase
或常规的 Python 对象。类似于 DataFrame 的参数(包括 dask 和 pandas)必须与self
具有相同数量的分区,或者只包含一个分区。关键字参数、单分区参数和一般 Python 对象参数将被广播到所有分区。- 强制元数据bool, 默认 True
是否在运行时强制要求
func
生成的 DataFrame 结构实际上与meta
的结构匹配。这将重命名和重新排序每个分区的列,如果无法执行此操作,则会引发错误,但如果 dtypes 不匹配,则不会引发错误。- transform_divisionsbool, 默认 True
是否将函数应用于分区,并将这些转换后的分区应用于输出。
- clear_divisionsbool, 默认 False
是否应清除分区。如果为 True,transform_divisions 将被忽略。
- metapd.DataFrame, pd.Series, dict, iterable, tuple, optional
一个空的
pd.DataFrame
或pd.Series
,其数据类型和列名与输出匹配。此元数据对于 dask dataframe 中的许多算法正常工作是必要的。为了便于使用,还提供了一些替代输入。除了DataFrame
之外,还可以提供{name: dtype}
的字典或(name, dtype)
的可迭代对象(注意名称的顺序应与列的顺序匹配)。除了系列之外,还可以使用(name, dtype)
的元组。如果未提供,dask 将尝试推断元数据。这可能会导致意外结果,因此建议提供meta
。有关更多信息,请参阅dask.dataframe.utils.make_meta
。
示例
给定一个 DataFrame、Series 或 Index,例如:
>>> import pandas as pd >>> import dask_expr as dd >>> df = pd.DataFrame({'x': [1, 2, 3, 4, 5], ... 'y': [1., 2., 3., 4., 5.]}) >>> ddf = dd.from_pandas(df, npartitions=2)
可以使用
map_partitions
在每个分区上应用一个函数。可以选择提供额外的参数和关键字,这些参数和关键字将在分区之后传递给函数。在这里,我们将一个带有参数和关键字的函数应用于一个 DataFrame,结果得到一个 Series:
>>> def myadd(df, a, b=1): ... return df.x + df.y + a + b >>> res = ddf.map_partitions(myadd, 1, b=2) >>> res.dtype dtype('float64')
这里我们将一个函数应用于一个Series,结果得到一个Series:
>>> res = ddf.x.map_partitions(lambda x: len(x)) # ddf.x is a Dask Series Structure >>> res.dtype dtype('int64')
默认情况下,dask 会尝试通过在某些假数据上运行您提供的函数来推断输出元数据。这在许多情况下效果很好,但有时可能会很昂贵,甚至失败。为了避免这种情况,您可以使用
meta
关键字手动指定输出元数据。这可以通过多种形式指定,更多信息请参见dask.dataframe.utils.make_meta
。在这里,我们指定输出是一个没有名称的 Series,并且数据类型为
float64
:>>> res = ddf.map_partitions(myadd, 1, b=2, meta=(None, 'f8'))
这里我们映射一个函数,该函数接收一个 DataFrame,并返回一个带有新列的 DataFrame:
>>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y)) >>> res.dtypes x int64 y float64 z float64 dtype: object
与之前一样,输出元数据也可以手动指定。这次我们传入一个
dict
,因为输出是一个 DataFrame:>>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y), ... meta={'x': 'i8', 'y': 'f8', 'z': 'f8'})
在元数据不变的情况下,您也可以直接传入对象本身:
>>> res = ddf.map_partitions(lambda df: df.head(), meta=ddf)
另请注意,假定索引和分区保持不变。如果正在映射的函数更改了索引/分区,则需要传递
clear_divisions=True
。>>> ddf.map_partitions(func, clear_divisions=True)
你的映射函数通过接受一个特殊的
partition_info
关键字参数来获取其在数据框中的位置信息。>>> def func(partition, partition_info=None): ... pass
这将接收以下信息:
>>> partition_info {'number': 1, 'division': 3}
对于每个作为dask数据帧的参数和关键字参数,您将收到表示数据帧第n个分区的数字(n)和分区中的第一个索引值(即分区中的第一个索引值)。如果分区未知(例如,如果索引未排序),则分区将返回None。