dask.dataframe.DataFrame.map_partitions
dask.dataframe.DataFrame.map_partitions¶
- DataFrame.map_partitions(func, *args, **kwargs)¶
在每个 DataFrame 分区上应用 Python 函数。
请注意,索引和分区假定保持不变。
- 参数
- 函数函数
应用于每个分区的函数。如果此函数接受特殊的
partition_info
关键字参数,它将接收到关于分区在数据框中相对位置的信息。- args, kwargs
传递给函数的位置参数和关键字参数。位置参数是按分区计算的,而关键字参数在所有分区中共享。分区本身将是第一个位置参数,所有其他参数在其后传递。参数可以是
Scalar
、Delayed
或常规的 Python 对象。类似 DataFrame 的参数(包括 dask 和 pandas)将在应用函数之前重新分区以对齐(如果必要);请参阅align_dataframes
以控制此行为。- 强制元数据bool, 默认 True
是否在运行时强制要求
func
生成的 DataFrame 结构实际上与meta
的结构匹配。这将重命名和重新排序每个分区的列,如果无法执行此操作,则会引发错误,但如果 dtypes 不匹配,则不会引发错误。- transform_divisionsbool, 默认 True
是否将函数应用于分区,并将这些转换后的分区应用于输出。
- 对齐数据框bool, 默认 True
是否重新分区 DataFrame 或 Series 类参数(包括 dask 和 pandas),以便在应用函数之前它们的分区对齐。这要求所有输入都有已知的分区。单分区输入将被拆分为多个分区。
如果为 False,所有输入必须具有相同数量的分区或单个分区。单分区输入将被广播到多分区输入的每个分区。
- metapd.DataFrame, pd.Series, dict, iterable, tuple, optional
一个空的
pd.DataFrame
或pd.Series
,其数据类型和列名与输出匹配。此元数据对于 dask dataframe 中的许多算法正常工作是必要的。为了便于使用,还提供了一些替代输入。除了DataFrame
之外,还可以提供{name: dtype}
的字典或(name, dtype)
的可迭代对象(注意名称的顺序应与列的顺序匹配)。除了系列之外,还可以使用(name, dtype)
的元组。如果未提供,dask 将尝试推断元数据。这可能会导致意外结果,因此建议提供meta
。有关更多信息,请参阅dask.dataframe.utils.make_meta
。
示例
给定一个 DataFrame、Series 或 Index,例如:
>>> import pandas as pd >>> import dask.dataframe as dd >>> df = pd.DataFrame({'x': [1, 2, 3, 4, 5], ... 'y': [1., 2., 3., 4., 5.]}) >>> ddf = dd.from_pandas(df, npartitions=2)
可以使用
map_partitions
在每个分区上应用一个函数。可以选择提供额外的参数和关键字,这些参数和关键字将在分区之后传递给函数。在这里,我们将一个带有参数和关键字的函数应用于一个 DataFrame,结果得到一个 Series:
>>> def myadd(df, a, b=1): ... return df.x + df.y + a + b >>> res = ddf.map_partitions(myadd, 1, b=2) >>> res.dtype dtype('float64')
这里我们将一个函数应用于一个Series,结果得到一个Series:
>>> res = ddf.x.map_partitions(lambda x: len(x)) # ddf.x is a Dask Series Structure >>> res.dtype dtype('int64')
默认情况下,dask 会尝试通过在某些假数据上运行您提供的函数来推断输出元数据。这在许多情况下效果很好,但有时可能会很昂贵,甚至失败。为了避免这种情况,您可以使用
meta
关键字手动指定输出元数据。这可以通过多种形式指定,更多信息请参见dask.dataframe.utils.make_meta
。在这里,我们指定输出是一个没有名称的 Series,并且数据类型为
float64
:>>> res = ddf.map_partitions(myadd, 1, b=2, meta=(None, 'f8'))
这里我们映射一个函数,该函数接收一个 DataFrame,并返回一个带有新列的 DataFrame:
>>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y)) >>> res.dtypes x int64 y float64 z float64 dtype: object
与之前一样,输出元数据也可以手动指定。这次我们传入一个
dict
,因为输出是一个 DataFrame:>>> res = ddf.map_partitions(lambda df: df.assign(z=df.x * df.y), ... meta={'x': 'i8', 'y': 'f8', 'z': 'f8'})
在元数据不变的情况下,您也可以直接传入对象本身:
>>> res = ddf.map_partitions(lambda df: df.head(), meta=ddf)
另请注意,假定索引和分区保持不变。如果您映射的函数更改了索引/分区,您需要在之后清除它们:
>>> ddf.map_partitions(func).clear_divisions()
你的映射函数通过接受一个特殊的
partition_info
关键字参数来获取其在数据框中的位置信息。>>> def func(partition, partition_info=None): ... pass
这将接收以下信息:
>>> partition_info {'number': 1, 'division': 3}
对于每个作为dask数据帧的参数和关键字参数,您将收到表示数据帧第n个分区的数字(n)和分区中的第一个索引值(即分区中的第一个索引值)。如果分区未知(例如,如果索引未排序),则分区将返回None。