dask.dataframe.from_map

dask.dataframe.from_map

dask.dataframe.from_map(func, *iterables, args=None, meta=None, divisions=None, label=None, token=None, enforce_metadata=True, **kwargs)[源代码]

从自定义函数映射创建一个 DataFrame 集合。

from_map 是从 Dask 原生不支持的数据源读取数据时的首选选项,或者当数据源在交给 Dask DataFrames 处理之前需要自定义处理时。例如,像二进制文件或其他没有 IO 连接器的不结构化数据。

参数
函数可调用

用于创建每个分区的函数。如果 func 满足 DataFrameIOFunction 协议,将启用列投影。

*可迭代对象可迭代对象

可迭代对象映射到每个输出分区。所有可迭代对象的长度必须相同。这个长度决定了输出集合中的分区数量(每个分区只有一个可迭代对象的元素会传递给 func)。

参数列表或元组,可选

要广播到每个输出分区的位置参数。请注意,这些参数将始终在 iterables 位置参数之后传递给 func

metapd.DataFrame, pd.Series, dict, iterable, tuple, optional

一个空的 pd.DataFramepd.Series,其数据类型和列名与输出匹配。此元数据对于 dask dataframe 中的许多算法正常工作是必要的。为了便于使用,还提供了一些替代输入。除了 DataFrame 之外,还可以提供 {name: dtype} 的字典或 (name, dtype) 的可迭代对象(注意名称的顺序应与列的顺序匹配)。除了系列之外,还可以使用 (name, dtype) 的元组。如果未提供,dask 将尝试推断元数据。这可能会导致意外结果,因此建议提供 meta。有关更多信息,请参阅 dask.dataframe.utils.make_meta

部门tuple, str, 可选

沿索引的分区边界。对于元组,请参见 http://www.aidoczh.com/dask/dataframe-design.html#partitions 对于字符串 ‘sorted’,将计算延迟值以找到索引值。假设索引是相互排序的。如果为 None,则不会使用索引信息。

标签str, 可选

在输出集合键名中用作函数名称标签的字符串。

令牌str, 可选

在输出集合键名中用作“令牌”的字符串。

强制元数据bool, 默认 True

是否在运行时强制要求 func 生成的 DataFrame 结构实际上与 meta 的结构匹配。这将重命名和重新排序每个分区的列,如果无法执行此操作,则会引发错误,但如果 dtypes 不匹配,则不会引发错误。

**kwargs:

广播到每个输出分区的关键字参数。这些相同的参数将被传递给 func 用于每个输出分区。

参见

dask.layers.DataFrameIOLayer

示例

>>> import pandas as pd
>>> import dask.dataframe as dd
>>> func = lambda x, size=0: pd.Series([x] * size)
>>> inputs = ["A", "B"]
>>> dd.from_map(func, inputs, size=2).compute()
0    A
1    A
0    B
1    B
dtype: object

此API也可以作为其他基于文件的IO函数的替代品,例如 read_csv (它们已经是 from_map 包装函数):

>>> import pandas as pd
>>> import dask.dataframe as dd
>>> paths = ["0.csv", "1.csv", "2.csv"]
>>> dd.from_map(pd.read_csv, paths).head()  
                    name
timestamp
2000-01-01 00:00:00   Laura
2000-01-01 00:00:01  Oliver
2000-01-01 00:00:02   Alice
2000-01-01 00:00:03  Victor
2000-01-01 00:00:04     Bob

由于 from_map 允许你将任意函数映射到任意数量的可迭代对象上,它可以成为实现其他 DataFrame 创建方法中可能缺少的功能的一种非常方便的手段。例如,如果你碰巧对数据集中每个文件的行数有先验知识,你可以生成一个带有全局 RangeIndex 的 DataFrame 集合:

>>> import pandas as pd
>>> import numpy as np
>>> import dask.dataframe as dd
>>> paths = ["0.csv", "1.csv", "2.csv"]
>>> file_sizes = [86400, 86400, 86400]
>>> def func(path, row_offset):
...     # Read parquet file and set RangeIndex offset
...     df = pd.read_csv(path)
...     return df.set_index(
...         pd.RangeIndex(row_offset, row_offset+len(df))
...     )
>>> def get_ddf(paths, file_sizes):
...     offsets = [0] + list(np.cumsum(file_sizes))
...     return dd.from_map(
...         func, paths, offsets[:-1], divisions=offsets
...     )
>>> ddf = get_ddf(paths, file_sizes)  
>>> ddf.index  
Dask Index Structure:
npartitions=3
0         int64
86400       ...
172800      ...
259200      ...
dtype: int64
Dask Name: myfunc, 6 tasks