数据转换#

转换允许你处理和修改你的数据集。你可以组合转换来表达一系列计算。

备注

默认情况下,转换是惰性的。它们不会被执行,直到你通过 迭代数据集保存数据集检查数据集的属性 来触发数据的消费。

本指南向您展示如何:

转换行#

小技巧

如果你的转换是矢量化的,调用 map_batches() 以获得更好的性能。要了解更多信息,请参阅 转换批次

使用 map 转换行#

如果你的转换对每个输入行都返回恰好一行,请调用 map()

import os
from typing import Any, Dict
import ray

def parse_filename(row: Dict[str, Any]) -> Dict[str, Any]:
    row["filename"] = os.path.basename(row["path"])
    return row

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple", include_paths=True)
    .map(parse_filename)
)

传递给 map() 的用户定义函数应为 Callable[[Dict[str, Any]], Dict[str, Any]] 类型。换句话说,您的函数应输入和输出一个字典,其中键为字符串,值为任意类型。例如:

from typing import Any, Dict

def fn(row: Dict[str, Any]) -> Dict[str, Any]:
    # access row data
    value = row["col1"]

    # add data to row
    row["col2"] = ...

    # return row
    return row

使用 flat map 转换行#

如果你的转换为每个输入行返回多行,请调用 flat_map()

from typing import Any, Dict, List
import ray

def duplicate_row(row: Dict[str, Any]) -> List[Dict[str, Any]]:
    return [row] * 2

print(
    ray.data.range(3)
    .flat_map(duplicate_row)
    .take_all()
)
[{'id': 0}, {'id': 0}, {'id': 1}, {'id': 1}, {'id': 2}, {'id': 2}]

传递给 flat_map() 的用户定义函数应为 Callable[[Dict[str, Any]], List[Dict[str, Any]]] 类型。换句话说,您的函数应输入一个键为字符串、值为任意类型的字典,并输出一个与输入类型相同的字典列表,例如:

from typing import Any, Dict, List

def fn(row: Dict[str, Any]) -> List[Dict[str, Any]]:
    # access row data
    value = row["col1"]

    # add data to row
    row["col2"] = ...

    # construct output list
    output = [row, row]

    # return list of output rows
    return output

转换批次#

如果你的转换是向量化的,就像大多数 NumPy 或 pandas 操作一样,批量转换比逐行转换更高效。

from typing import Dict
import numpy as np
import ray

def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    batch["image"] = np.clip(batch["image"] + 4, 0, 255)
    return batch

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    .map_batches(increase_brightness)
)

配置批处理格式#

Ray Data 将批次表示为 NumPy ndarrays 或 pandas DataFrames 的字典。默认情况下,Ray Data 将批次表示为 NumPy ndarrays 的字典。要配置批次类型,请在 map_batches() 中指定 batch_format。你可以从你的函数中返回任一格式,但 batch_format 应与你的函数的输入匹配。

from typing import Dict
import numpy as np
import ray

def increase_brightness(batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    batch["image"] = np.clip(batch["image"] + 4, 0, 255)
    return batch

ds = (
    ray.data.read_images("s3://anonymous@ray-example-data/image-datasets/simple")
    .map_batches(increase_brightness, batch_format="numpy")
)
import pandas as pd
import ray

def drop_nas(batch: pd.DataFrame) -> pd.DataFrame:
    return batch.dropna()

ds = (
    ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
    .map_batches(drop_nas, batch_format="pandas")
)

你传递给 map_batches() 的用户定义函数更加灵活。因为你可以用多种方式表示批次(参见 配置批次格式),该函数应该是 Callable[DataBatch, DataBatch] 类型,其中 DataBatch = Union[pd.DataFrame, Dict[str, np.ndarray]]。换句话说,你的函数应该以输入和输出一批数据,你可以将其表示为 pandas DataFrame 或一个带有字符串键和 NumPy ndarrays 值的字典。例如,你的函数可能如下所示:

import pandas as pd

def fn(batch: pd.DataFrame) -> pd.DataFrame:
    # modify batch
    batch = ...

    # return batch
    return output

用户定义的函数也可以是一个Python生成器,它产生批次,因此该函数也可以是类型 Callable[DataBatch, Iterator[[DataBatch]] ,其中 DataBatch = Union[pd.DataFrame, Dict[str, np.ndarray]] 。在这种情况下,您的函数将如下所示:

from typing import Dict, Iterator
import numpy as np

def fn(batch: Dict[str, np.ndarray]) -> Iterator[Dict[str, np.ndarray]]:
    # yield the same batch multiple times
    for _ in range(10):
        yield batch

配置批量大小#

增加 batch_size 可以提高像 NumPy 函数和模型推理这样的矢量化转换的性能。然而,如果你的批次大小过大,你的程序可能会耗尽内存。如果你遇到内存不足的错误,请减少你的 batch_size

有状态转换#

如果你的转换需要昂贵的设置,例如下载模型权重,请使用可调用的Python类而不是函数来使转换具有状态。当使用Python类时,``__init__``方法会在每个工作进程上被调用一次以执行设置。相比之下,函数是无状态的,因此任何设置都必须为每个数据项执行。

在内部,Ray Data 使用任务来执行函数,并使用角色来执行类。要了解更多关于任务和角色的信息,请阅读 Ray Core 关键概念

要使用Python类转换数据,请完成以下步骤:

  1. 实现一个类。在 __init__ 中进行设置,在 __call__ 中转换数据。

  2. 调用 map_batches()map()flat_map()。通过 concurrency 参数传递要使用的并发工作线程数量。每个工作线程并行转换数据的一个分区。固定并发工作线程的数量可以提供最可预测的性能,但您也可以传递一个 (min, max) 元组,允许 Ray Data 自动调整并发工作线程的数量。

from typing import Dict
import numpy as np
import torch
import ray

class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity()
        self.model.eval()

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        inputs = torch.as_tensor(batch["data"], dtype=torch.float32)
        with torch.inference_mode():
            batch["output"] = self.model(inputs).detach().numpy()
        return batch

ds = (
    ray.data.from_numpy(np.ones((32, 100)))
    .map_batches(TorchPredictor, concurrency=2)
)
from typing import Dict
import numpy as np
import torch
import ray

class TorchPredictor:

    def __init__(self):
        self.model = torch.nn.Identity().cuda()
        self.model.eval()

    def __call__(self, batch: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
        inputs = torch.as_tensor(batch["data"], dtype=torch.float32).cuda()
        with torch.inference_mode():
            batch["output"] = self.model(inputs).detach().cpu().numpy()
        return batch

ds = (
    ray.data.from_numpy(np.ones((32, 100)))
    .map_batches(
        TorchPredictor,
        # Two workers with one GPU each
        concurrency=2,
        # Batch size is required if you're using GPUs.
        batch_size=4,
        num_gpus=1
    )
)

分组和转换组#

要转换组,调用 groupby() 来分组行。然后,调用 map_groups() 来转换组。

from typing import Dict
import numpy as np
import ray

items = [
    {"image": np.zeros((32, 32, 3)), "label": label}
    for _ in range(10) for label in range(100)
]

def normalize_images(group: Dict[str, np.ndarray]) -> Dict[str, np.ndarray]:
    group["image"] = (group["image"] - group["image"].mean()) / group["image"].std()
    return group

ds = (
    ray.data.from_items(items)
    .groupby("label")
    .map_groups(normalize_images)
)
import pandas as pd
import ray

def normalize_features(group: pd.DataFrame) -> pd.DataFrame:
    target = group.drop("target")
    group = (group - group.min()) / group.std()
    group["target"] = target
    return group

ds = (
    ray.data.read_csv("s3://anonymous@air-example-data/iris.csv")
    .groupby("target")
    .map_groups(normalize_features)
)