ray.data.Dataset.map_batches#

Dataset.map_batches(fn: Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]] | Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], Iterator[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]]] | _CallableClassProtocol, *, batch_size: int | None | Literal['default'] = 'default', compute: ComputeStrategy | None = None, batch_format: str | None = 'default', zero_copy_batch: bool = False, fn_args: Iterable[Any] | None = None, fn_kwargs: Dict[str, Any] | None = None, fn_constructor_args: Iterable[Any] | None = None, fn_constructor_kwargs: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, concurrency: int | Tuple[int, int] | None = None, ray_remote_args_fn: Callable[[], Dict[str, Any]] | None = None, **ray_remote_args) Dataset[源代码]#

将给定的函数应用于数据批次。

此方法对于数据预处理和执行推理非常有用。了解更多信息,请参阅 转换批次

你可以使用一个函数或一个可调用的类来执行转换。对于函数,Ray Data 使用无状态的 Ray 任务。对于类,Ray Data 使用有状态的 Ray 角色。更多信息请参见 有状态转换

小技巧

如果 fn 不改变其输入,设置 zero_copy_batch=True 以提高性能并减少内存使用。

示例

调用 map_batches() 来转换你的数据。

from typing import Dict
import numpy as np
import ray

def add_dog_years(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    batch["age_in_dog_years"] = 7 * batch["age"]
    return batch

ds = (
    ray.data.from_items([
        {"name": "Luna", "age": 4},
        {"name": "Rory", "age": 14},
        {"name": "Scout", "age": 9},
    ])
    .map_batches(add_dog_years)
)
ds.show()
{'name': 'Luna', 'age': 4, 'age_in_dog_years': 28}
{'name': 'Rory', 'age': 14, 'age_in_dog_years': 98}
{'name': 'Scout', 'age': 9, 'age_in_dog_years': 63}

如果你的函数返回大型对象,请分块生成输出。

from typing import Dict
import ray
import numpy as np

def map_fn_with_large_output(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    for i in range(3):
        yield {"large_output": np.ones((100, 1000))}

ds = (
    ray.data.from_items([1])
    .map_batches(map_fn_with_large_output)
)

如果你需要有状态的转换,使用Python可调用类。以下是一个示例,展示了如何使用有状态的转换来创建模型推理工作线程,而无需在每次调用时重新加载模型。

from typing import Dict
import numpy as np
import torch
import ray

class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity().cuda()
        self.model.eval()

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        inputs = torch.as_tensor(batch["data"], dtype=torch.float32).cuda()
        with torch.inference_mode():
            batch["output"] = self.model(inputs).detach().cpu().numpy()
        return batch

ds = (
    ray.data.from_numpy(np.ones((32, 100)))
    .map_batches(
        TorchPredictor,
        # Two workers with one GPU each
        concurrency=2,
        # Batch size is required if you're using GPUs.
        batch_size=4,
        num_gpus=1
    )
)

了解更多,请参阅 端到端:离线批量推理

参数:
  • fn – 要应用于记录批次的函数或生成器,或可以实例化以创建此类可调用对象的类类型。注意 fn 必须是可序列化的。

  • batch_size – 每个批次中所需行数,或 None 以使用整个块作为批次(块可能包含不同数量的行)。如果 batch_size 不能均匀地分割发送到给定映射任务的块,则提供给 fn 的批次的实际大小可能小于 batch_size。默认的 batch_size 是 1024,使用“default”。

  • compute – 此参数已弃用。请使用 concurrency 参数。

  • batch_format – 如果 "default""numpy",批次是 Dict[str, numpy.ndarray]。如果 "pandas",批次是 pandas.DataFrame

  • zero_copy_batch – 是否应提供 fn 零拷贝、只读的批次。如果这是 True 并且 batch_format 转换不需要拷贝,批次将是 Ray 对象存储中数据的零拷贝、只读视图,这可以减少内存使用并提高性能。如果这是 False,批次是可写的,这需要额外的拷贝来保证。如果 fn 修改其输入,这需要是 False 以避免“赋值目标为只读”或“缓冲源数组为只读”错误。默认是 False

  • fn_args – 传递给 fn 的位置参数,在第一个参数之后。这些参数是底层 Ray 任务的顶级参数。

  • fn_kwargs – 传递给 fn 的关键字参数。这些参数是底层 Ray 任务的顶级参数。

  • fn_constructor_args – 传递给 fn 构造函数的定位参数。只有当 fn 是一个可调用类时,你才能提供这个参数。这些参数是底层 Ray 演员构造任务中的顶级参数。

  • fn_constructor_kwargs – 传递给 fn 构造函数的键值参数。只有在 fn 是一个可调用类时才能提供这些参数。这些参数是底层 Ray 角色构造任务中的顶级参数。

  • num_cpus – 为每个并行映射工作器保留的CPU数量。

  • num_gpus – 为每个并行映射工作器预留的GPU数量。例如,指定 num_gpus=1 以请求每个并行映射工作器使用1个GPU。

  • concurrency – 要同时使用的 Ray 工作者的数量。对于一个固定大小的工作者池,大小为 n,指定 concurrency=n。对于一个从 mn 工作者的自动扩展工作者池,指定 concurrency=(m, n)

  • ray_remote_args_fn – 一个返回传递给每个映射工作者的远程参数字典的函数。此参数的目的是为每个执行者/任务生成动态参数,并且将在每次初始化工作者之前被调用。从此字典返回的参数将始终覆盖 ray_remote_args 中的参数。注意:这是一个高级的、实验性的功能。

  • ray_remote_args – 为每个映射工作者从ray请求的额外资源需求。

备注

如果 batch_size 不能整除分配给某个映射任务的块,提供给 fn 的批次大小可能会小于指定的 batch_size

如果设置了 batch_size 并且每个输入块小于 batch_size,Ray Data 会将许多块捆绑在一起作为一项任务的输入,直到它们的总大小等于或大于给定的 batch_size。如果未设置 batch_size,则不会进行捆绑。每个任务只会接收一个输入块。

参见

iter_batches()

调用此函数以迭代数据批次。

flat_map()

调用此方法可以从现有记录创建新记录。与 map() 不同,传递给 flat_map() 的函数可以返回多个记录。

map()

调用此方法以一次转换一条记录。