dask.dataframe.from_map
dask.dataframe.from_map¶
- dask.dataframe.from_map(func, *iterables, args=None, meta=None, divisions=None, label=None, token=None, enforce_metadata=True, **kwargs)[源代码]¶
从自定义函数映射创建一个 DataFrame 集合。
from_map
是从 Dask 原生不支持的数据源读取数据时的首选选项,或者当数据源在交给 Dask DataFrames 处理之前需要自定义处理时。例如,像二进制文件或其他没有 IO 连接器的不结构化数据。- 参数
- 函数可调用
用于创建每个分区的函数。如果
func
满足DataFrameIOFunction
协议,将启用列投影。- *可迭代对象可迭代对象
可迭代对象映射到每个输出分区。所有可迭代对象的长度必须相同。这个长度决定了输出集合中的分区数量(每个分区只有一个可迭代对象的元素会传递给
func
)。- 参数列表或元组,可选
要广播到每个输出分区的位置参数。请注意,这些参数将始终在
iterables
位置参数之后传递给func
。- metapd.DataFrame, pd.Series, dict, iterable, tuple, optional
一个空的
pd.DataFrame
或pd.Series
,其数据类型和列名与输出匹配。此元数据对于 dask dataframe 中的许多算法正常工作是必要的。为了便于使用,还提供了一些替代输入。除了DataFrame
之外,还可以提供{name: dtype}
的字典或(name, dtype)
的可迭代对象(注意名称的顺序应与列的顺序匹配)。除了系列之外,还可以使用(name, dtype)
的元组。如果未提供,dask 将尝试推断元数据。这可能会导致意外结果,因此建议提供meta
。有关更多信息,请参阅dask.dataframe.utils.make_meta
。- 部门tuple, str, 可选
沿索引的分区边界。对于元组,请参见 http://www.aidoczh.com/dask/dataframe-design.html#partitions 对于字符串 ‘sorted’,将计算延迟值以找到索引值。假设索引是相互排序的。如果为 None,则不会使用索引信息。
- 标签str, 可选
在输出集合键名中用作函数名称标签的字符串。
- 令牌str, 可选
在输出集合键名中用作“令牌”的字符串。
- 强制元数据bool, 默认 True
是否在运行时强制要求
func
生成的 DataFrame 结构实际上与meta
的结构匹配。这将重命名和重新排序每个分区的列,如果无法执行此操作,则会引发错误,但如果 dtypes 不匹配,则不会引发错误。- **kwargs:
广播到每个输出分区的关键字参数。这些相同的参数将被传递给
func
用于每个输出分区。
参见
dask.layers.DataFrameIOLayer
示例
>>> import pandas as pd >>> import dask.dataframe as dd >>> func = lambda x, size=0: pd.Series([x] * size) >>> inputs = ["A", "B"] >>> dd.from_map(func, inputs, size=2).compute() 0 A 1 A 0 B 1 B dtype: object
此API也可以作为其他基于文件的IO函数的替代品,例如
read_csv
(它们已经是from_map
包装函数):>>> import pandas as pd >>> import dask.dataframe as dd >>> paths = ["0.csv", "1.csv", "2.csv"] >>> dd.from_map(pd.read_csv, paths).head() name timestamp 2000-01-01 00:00:00 Laura 2000-01-01 00:00:01 Oliver 2000-01-01 00:00:02 Alice 2000-01-01 00:00:03 Victor 2000-01-01 00:00:04 Bob
由于
from_map
允许你将任意函数映射到任意数量的可迭代对象上,它可以成为实现其他 DataFrame 创建方法中可能缺少的功能的一种非常方便的手段。例如,如果你碰巧对数据集中每个文件的行数有先验知识,你可以生成一个带有全局 RangeIndex 的 DataFrame 集合:>>> import pandas as pd >>> import numpy as np >>> import dask.dataframe as dd >>> paths = ["0.csv", "1.csv", "2.csv"] >>> file_sizes = [86400, 86400, 86400] >>> def func(path, row_offset): ... # Read parquet file and set RangeIndex offset ... df = pd.read_csv(path) ... return df.set_index( ... pd.RangeIndex(row_offset, row_offset+len(df)) ... ) >>> def get_ddf(paths, file_sizes): ... offsets = [0] + list(np.cumsum(file_sizes)) ... return dd.from_map( ... func, paths, offsets[:-1], divisions=offsets ... ) >>> ddf = get_ddf(paths, file_sizes) >>> ddf.index Dask Index Structure: npartitions=3 0 int64 86400 ... 172800 ... 259200 ... dtype: int64 Dask Name: myfunc, 6 tasks