ray.data.Dataset.map_batches#
- Dataset.map_batches(fn: Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]] | Callable[[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]], Iterator[pyarrow.Table | pandas.DataFrame | Dict[str, numpy.ndarray]]] | _CallableClassProtocol, *, batch_size: int | None | Literal['default'] = 'default', compute: ComputeStrategy | None = None, batch_format: str | None = 'default', zero_copy_batch: bool = False, fn_args: Iterable[Any] | None = None, fn_kwargs: Dict[str, Any] | None = None, fn_constructor_args: Iterable[Any] | None = None, fn_constructor_kwargs: Dict[str, Any] | None = None, num_cpus: float | None = None, num_gpus: float | None = None, concurrency: int | Tuple[int, int] | None = None, ray_remote_args_fn: Callable[[], Dict[str, Any]] | None = None, **ray_remote_args) Dataset [源代码]#
将给定的函数应用于数据批次。
此方法对于数据预处理和执行推理非常有用。了解更多信息,请参阅 转换批次。
你可以使用一个函数或一个可调用的类来执行转换。对于函数,Ray Data 使用无状态的 Ray 任务。对于类,Ray Data 使用有状态的 Ray 角色。更多信息请参见 有状态转换。
小技巧
如果
fn
不改变其输入,设置zero_copy_batch=True
以提高性能并减少内存使用。示例
调用
map_batches()
来转换你的数据。from typing import Dict import numpy as np import ray def add_dog_years(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: batch["age_in_dog_years"] = 7 * batch["age"] return batch ds = ( ray.data.from_items([ {"name": "Luna", "age": 4}, {"name": "Rory", "age": 14}, {"name": "Scout", "age": 9}, ]) .map_batches(add_dog_years) ) ds.show()
{'name': 'Luna', 'age': 4, 'age_in_dog_years': 28} {'name': 'Rory', 'age': 14, 'age_in_dog_years': 98} {'name': 'Scout', 'age': 9, 'age_in_dog_years': 63}
如果你的函数返回大型对象,请分块生成输出。
from typing import Dict import ray import numpy as np def map_fn_with_large_output(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: for i in range(3): yield {"large_output": np.ones((100, 1000))} ds = ( ray.data.from_items([1]) .map_batches(map_fn_with_large_output) )
如果你需要有状态的转换,使用Python可调用类。以下是一个示例,展示了如何使用有状态的转换来创建模型推理工作线程,而无需在每次调用时重新加载模型。
from typing import Dict import numpy as np import torch import ray class TorchPredictor: def __init__(self): self.model = torch.nn.Identity().cuda() self.model.eval() def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]: inputs = torch.as_tensor(batch["data"], dtype=torch.float32).cuda() with torch.inference_mode(): batch["output"] = self.model(inputs).detach().cpu().numpy() return batch ds = ( ray.data.from_numpy(np.ones((32, 100))) .map_batches( TorchPredictor, # Two workers with one GPU each concurrency=2, # Batch size is required if you're using GPUs. batch_size=4, num_gpus=1 ) )
了解更多,请参阅 端到端:离线批量推理。
- 参数:
fn – 要应用于记录批次的函数或生成器,或可以实例化以创建此类可调用对象的类类型。注意
fn
必须是可序列化的。batch_size – 每个批次中所需行数,或
None
以使用整个块作为批次(块可能包含不同数量的行)。如果batch_size
不能均匀地分割发送到给定映射任务的块,则提供给fn
的批次的实际大小可能小于batch_size
。默认的batch_size
是 1024,使用“default”。compute – 此参数已弃用。请使用
concurrency
参数。batch_format – 如果
"default"
或"numpy"
,批次是Dict[str, numpy.ndarray]
。如果"pandas"
,批次是pandas.DataFrame
。zero_copy_batch – 是否应提供
fn
零拷贝、只读的批次。如果这是True
并且batch_format
转换不需要拷贝,批次将是 Ray 对象存储中数据的零拷贝、只读视图,这可以减少内存使用并提高性能。如果这是False
,批次是可写的,这需要额外的拷贝来保证。如果fn
修改其输入,这需要是False
以避免“赋值目标为只读”或“缓冲源数组为只读”错误。默认是False
。fn_args – 传递给
fn
的位置参数,在第一个参数之后。这些参数是底层 Ray 任务的顶级参数。fn_kwargs – 传递给
fn
的关键字参数。这些参数是底层 Ray 任务的顶级参数。fn_constructor_args – 传递给
fn
构造函数的定位参数。只有当fn
是一个可调用类时,你才能提供这个参数。这些参数是底层 Ray 演员构造任务中的顶级参数。fn_constructor_kwargs – 传递给
fn
构造函数的键值参数。只有在fn
是一个可调用类时才能提供这些参数。这些参数是底层 Ray 角色构造任务中的顶级参数。num_cpus – 为每个并行映射工作器保留的CPU数量。
num_gpus – 为每个并行映射工作器预留的GPU数量。例如,指定
num_gpus=1
以请求每个并行映射工作器使用1个GPU。concurrency – 要同时使用的 Ray 工作者的数量。对于一个固定大小的工作者池,大小为
n
,指定concurrency=n
。对于一个从m
到n
工作者的自动扩展工作者池,指定concurrency=(m, n)
。ray_remote_args_fn – 一个返回传递给每个映射工作者的远程参数字典的函数。此参数的目的是为每个执行者/任务生成动态参数,并且将在每次初始化工作者之前被调用。从此字典返回的参数将始终覆盖
ray_remote_args
中的参数。注意:这是一个高级的、实验性的功能。ray_remote_args – 为每个映射工作者从ray请求的额外资源需求。
备注
如果
batch_size
不能整除分配给某个映射任务的块,提供给fn
的批次大小可能会小于指定的batch_size
。如果设置了
batch_size
并且每个输入块小于batch_size
,Ray Data 会将许多块捆绑在一起作为一项任务的输入,直到它们的总大小等于或大于给定的batch_size
。如果未设置batch_size
,则不会进行捆绑。每个任务只会接收一个输入块。参见
iter_batches()
调用此函数以迭代数据批次。
flat_map()
调用此方法可以从现有记录创建新记录。与
map()
不同,传递给flat_map()
的函数可以返回多个记录。map()
调用此方法以一次转换一条记录。