ray.data.Dataset.flat_map#
- Dataset.flat_map(fn: Callable[[Dict[str, Any]], List[Dict[str, Any]]] | Callable[[Dict[str, Any]], Iterator[List[Dict[str, Any]]]] | _CallableClassProtocol, *, compute: ComputeStrategy | None = None, fn_args: Iterable[Any] | None = None, fn_kwargs: Dict[str, Any] | None = None, fn_constructor_args: Iterable[Any] | None = None, fn_constructor_kwargs: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, concurrency: int | Tuple[int, int] | None = None, ray_remote_args_fn: Callable[[], Dict[str, Any]] | None = None, **ray_remote_args) Dataset [源代码]#
将给定的函数应用于每一行,然后展平结果。
如果你的转换为每个输入行返回多行,请使用此方法。
你可以使用一个函数或一个可调用的类来执行转换。对于函数,Ray Data 使用无状态的 Ray 任务。对于类,Ray Data 使用有状态的 Ray 角色。更多信息请参见 有状态转换。
小技巧
map_batches()
也可以修改行数。如果你的转换是向量化的,比如大多数 NumPy 和 pandas 操作,它可能会更快。示例
from typing import Any, Dict, List import ray def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]: return [row] * 2 print( ray.data.range(3) .flat_map(duplicate_row) .take_all() )
[{'id': 0}, {'id': 0}, {'id': 1}, {'id': 1}, {'id': 2}, {'id': 2}]
时间复杂度:O(数据集大小 / 并行度)
- 参数:
fn – 要应用于每个记录的函数或生成器,或可以实例化以创建此类可调用对象的类类型。
compute – 此参数已弃用。请使用
concurrency
参数。fn_args – 传递给
fn
的位置参数,在第一个参数之后。这些参数是底层 Ray 任务的顶级参数。fn_kwargs – 传递给
fn
的关键字参数。这些参数是底层 Ray 任务的顶级参数。fn_constructor_args – 传递给
fn
构造函数的定位参数。只有当fn
是一个可调用类时,你才能提供这个参数。这些参数是底层 Ray 演员构造任务中的顶级参数。fn_constructor_kwargs – 传递给
fn
构造函数的键值参数。只有在fn
是一个可调用类时才能提供这些参数。这些参数是底层 Ray 角色构造任务中的顶级参数。num_cpus – 为每个并行映射工作器保留的CPU数量。
num_gpus – 为每个并行映射工作器预留的GPU数量。例如,指定
num_gpus=1
以请求每个并行映射工作器使用1个GPU。concurrency – 要同时使用的 Ray 工作者的数量。对于一个固定大小的工作者池,大小为
n
,指定concurrency=n
。对于一个从m
到n
工作者的自动扩展工作者池,指定concurrency=(m, n)
。ray_remote_args_fn – 一个返回传递给每个映射工作者的远程参数字典的函数。此参数的目的是为每个执行者/任务生成动态参数,并且将在每次初始化工作者之前被调用。从此字典返回的参数将始终覆盖
ray_remote_args
中的参数。注意:这是一个高级的、实验性的功能。ray_remote_args – 为每个映射工作者从ray请求的额外资源需求。
参见
map_batches()
调用此方法以转换数据批次。
map()
调用此方法以逐行转换。