ray.data.Dataset.flat_map#

Dataset.flat_map(fn: Callable[[Dict[str, Any]], List[Dict[str, Any]]] | Callable[[Dict[str, Any]], Iterator[List[Dict[str, Any]]]] | _CallableClassProtocol, *, compute: ComputeStrategy | None = None, fn_args: Iterable[Any] | None = None, fn_kwargs: Dict[str, Any] | None = None, fn_constructor_args: Iterable[Any] | None = None, fn_constructor_kwargs: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, concurrency: int | Tuple[int, int] | None = None, ray_remote_args_fn: Callable[[], Dict[str, Any]] | None = None, **ray_remote_args) Dataset[源代码]#

将给定的函数应用于每一行,然后展平结果。

如果你的转换为每个输入行返回多行,请使用此方法。

你可以使用一个函数或一个可调用的类来执行转换。对于函数,Ray Data 使用无状态的 Ray 任务。对于类,Ray Data 使用有状态的 Ray 角色。更多信息请参见 有状态转换

小技巧

map_batches() 也可以修改行数。如果你的转换是向量化的,比如大多数 NumPy 和 pandas 操作,它可能会更快。

示例

from typing import Any, Dict, List
import ray

def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
    return [row] * 2

print(
    ray.data.range(3)
    .flat_map(duplicate_row)
    .take_all()
)
[{'id': 0}, {'id': 0}, {'id': 1}, {'id': 1}, {'id': 2}, {'id': 2}]

时间复杂度:O(数据集大小 / 并行度)

参数:
  • fn – 要应用于每个记录的函数或生成器,或可以实例化以创建此类可调用对象的类类型。

  • compute – 此参数已弃用。请使用 concurrency 参数。

  • fn_args – 传递给 fn 的位置参数,在第一个参数之后。这些参数是底层 Ray 任务的顶级参数。

  • fn_kwargs – 传递给 fn 的关键字参数。这些参数是底层 Ray 任务的顶级参数。

  • fn_constructor_args – 传递给 fn 构造函数的定位参数。只有当 fn 是一个可调用类时,你才能提供这个参数。这些参数是底层 Ray 演员构造任务中的顶级参数。

  • fn_constructor_kwargs – 传递给 fn 构造函数的键值参数。只有在 fn 是一个可调用类时才能提供这些参数。这些参数是底层 Ray 角色构造任务中的顶级参数。

  • num_cpus – 为每个并行映射工作器保留的CPU数量。

  • num_gpus – 为每个并行映射工作器预留的GPU数量。例如,指定 num_gpus=1 以请求每个并行映射工作器使用1个GPU。

  • concurrency – 要同时使用的 Ray 工作者的数量。对于一个固定大小的工作者池,大小为 n,指定 concurrency=n。对于一个从 mn 工作者的自动扩展工作者池,指定 concurrency=(m, n)

  • ray_remote_args_fn – 一个返回传递给每个映射工作者的远程参数字典的函数。此参数的目的是为每个执行者/任务生成动态参数,并且将在每次初始化工作者之前被调用。从此字典返回的参数将始终覆盖 ray_remote_args 中的参数。注意:这是一个高级的、实验性的功能。

  • ray_remote_args – 为每个映射工作者从ray请求的额外资源需求。

参见

map_batches()

调用此方法以转换数据批次。

map()

调用此方法以逐行转换。