dask_expr.from_map

dask_expr.from_map

dask_expr.from_map(func, *iterables, args=None, meta=_NoDefault.no_default, divisions=None, label=None, enforce_metadata=False, **kwargs)[源代码]

从自定义函数映射创建一个 DataFrame 集合。

from_map 是从 Dask 原生不支持的数据源读取数据时的首选选项,或者当数据源在交给 Dask DataFrames 处理之前需要自定义处理时。例如,像二进制文件或其他没有 IO 连接器的不结构化数据。

from_map 支持优化器进行列投影。如果函数支持 columns 参数,优化器会尝试将列选择推入 from_map 调用中。

参数
函数可调用

用于创建每个分区的函数。如果该函数具有 columns 关键字参数,则将启用列投影。

*可迭代对象可迭代对象

可迭代对象映射到每个输出分区。所有可迭代对象的长度必须相同。这个长度决定了输出集合中的分区数量(每个分区只有一个可迭代对象的元素会传递给 func)。

参数列表或元组,可选

要广播到每个输出分区的位置参数。请注意,这些参数将始终在 iterables 位置参数之后传递给 func

$META
部门tuple, str, 可选

沿索引的分区边界。对于元组,请参见 http://www.aidoczh.com/dask/dataframe-design.html#partitions 对于字符串 ‘sorted’,将计算延迟值以找到索引值。假设索引是相互排序的。如果为 None,则不会使用索引信息。

标签str, 可选

在输出集合键名中用作函数名称标签的字符串。

令牌str, 可选

在输出集合键名中用作“令牌”的字符串。

强制元数据bool, 默认 True

是否在运行时强制要求 func 生成的 DataFrame 结构实际上与 meta 的结构匹配。这将重命名和重新排序每个分区的列,如果无法执行此操作,则会引发错误,但如果 dtypes 不匹配,则不会引发错误。

**kwargs:

广播到每个输出分区的关键字参数。这些相同的参数将被传递给 func 用于每个输出分区。

示例

>>> import pandas as pd
>>> import dask.dataframe as dd
>>> func = lambda x, size=0: pd.Series([x] * size)
>>> inputs = ["A", "B"]
>>> dd.from_map(func, inputs, size=2).compute()
0    A
1    A
0    B
1    B
dtype: object

优化器将识别在 from_map 之后发生的列选择,并将列参数推入实际的 map 调用中,以尽可能早地删除不必要的列。

>>> def map_function(x, columns=None):
>>>     df = pd.DataFrame({"a": [1, 2], "b": x})
>>>     if columns is not None:
>>>         df = df[columns]
>>>     return df
>>>
>>> dd.from_map(map_function, [1, 2])["b"].compute()
0    1
1    1
0    2
1    2
Name: b, dtype: int64

此API也可以作为其他基于文件的IO函数的替代品,例如 read_csv (它们已经是 from_map 包装函数):

>>> import pandas as pd
>>> import dask.dataframe as dd
>>> paths = ["0.csv", "1.csv", "2.csv"]
>>> dd.from_map(pd.read_csv, paths).head()  
                    name
timestamp
2000-01-01 00:00:00   Laura
2000-01-01 00:00:01  Oliver
2000-01-01 00:00:02   Alice
2000-01-01 00:00:03  Victor
2000-01-01 00:00:04     Bob

由于 from_map 允许你将任意函数映射到任意数量的可迭代对象上,它可以成为实现其他 DataFrame 创建方法中可能缺失功能的非常方便的手段。例如,如果你碰巧对数据集中每个文件的行数有先验知识,你可以生成一个带有全局 RangeIndex 的 DataFrame 集合:

>>> import pandas as pd
>>> import numpy as np
>>> import dask.dataframe as dd
>>> paths = ["0.csv", "1.csv", "2.csv"]
>>> file_sizes = [86400, 86400, 86400]
>>> def func(path, row_offset):
...     # Read parquet file and set RangeIndex offset
...     df = pd.read_csv(path)
...     return df.set_index(
...         pd.RangeIndex(row_offset, row_offset+len(df))
...     )
>>> def get_ddf(paths, file_sizes):
...     offsets = [0] + list(np.cumsum(file_sizes))
...     return dd.from_map(
...         func, paths, offsets[:-1], divisions=offsets
...     )
>>> ddf = get_ddf(paths, file_sizes)  
>>> ddf.index  
Dask Index Structure:
npartitions=3
0         int64
86400       ...
172800      ...
259200      ...
dtype: int64
Dask Name: myfunc, 6 tasks